标签
远程调用
字数
15373 字
阅读时间
82 分钟
介绍
基于websocket开发,添加自定义注解,解析发送的地址、类型(请求或响应)及数据,支持集群
自定义注解及枚举
java
import java.lang.annotation.*;
/**
* WebSocket 处理方法注解
*
* 业务实现样例,参照登录处理
*
* @author Brack.zhu
* @date 2019/12/3
*/
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface WsAction {
/**
* 处理方法 为空取方法名(长度小于20)
*/
String action();
/**
* 接口方法描述
*/
String desc() default "";
/**
* WS操作类型
* @return
*/
WsType type();
/**
* 权限 默认登录
*/
WsPermit permit() default WsPermit.LOGIN;
/**
* 消息失败处理类---熔断
* @return
*/
Class<?> fallback() default void.class;
}标注在发送的实体类上,发送时自动对请求对象进行封装
java
import java.lang.annotation.*;
/**
* WS VO对象 注解
* @author Brack.zhu
* @date 2019/12/4
*/
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface WsData {
/**
* 接口方法 为空取方法名(长度小于20)
*/
String action();
}权限类型枚举
java
/**
* WS接口权限
*/
public enum WsPermit {
//公开
PUBLIC,
//登录
LOGIN,
}通讯类型枚举
java
/**
* 请求类型
* @author Brack.zhu
* @date 2019/12/10
*/
public enum WsType {
//请求类型
req,
//响应类型
resp,
}基础服务
标注在ws请求映射的对应方法上
java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.commnetsoft.commons.IErrorCode;
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.commons.utils.UUIDUtils;
import com.commnetsoft.exception.MicroRuntimeException;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.commnetsoft.ws.annotation.WsType;
import com.commnetsoft.ws.boot.ReqWsActionOp;
import com.commnetsoft.ws.boot.WsActionBeanPostProcessor;
import com.commnetsoft.ws.model.LocalWsSession;
import com.commnetsoft.ws.model.WsReq;
import com.commnetsoft.ws.model.WsResp;
import com.commnetsoft.ws.service.WsReceiveService;
import com.commnetsoft.ws.service.WsSendHelper;
import com.commnetsoft.ws.service.WsSessionService;
import com.commnetsoft.ws.util.WsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.websocket.*;
/**
* WebSocket抽象基础服务,对外提供WebSocket服务需要继承该抽象类
*
* @author Brack.zhu
* @date 2019/12/4
*/
public abstract class AbstractWsService {
/**
* WS协议 type key
*/
public final static String WS_TYPE_KEY = "type";
/**
* WS协议 ID key
*/
public final static String WS_ID_KEY = "id";
/**
* WS协议 Action key
*/
public final static String WS_ACTION_KEY = "action";
/**
* 登录操作名
*/
public final static String WS_LOGIN_ACTION_NAME = "login";
/**
* 心跳操作名
*/
public final static String WS_HEARTBEAT_ACTION_NAME = "heartbeat";
/**
* 未知操作名
*/
public final static String WS_UNKNOWN_ACTION_NAME = "unknown";
private Logger log = LoggerFactory.getLogger(AbstractWsService.class);
public WsActionBeanPostProcessor wsActionBeanPostProcessor() {
return SpringContextUtil.getBean(WsActionBeanPostProcessor.class);
}
public WsSessionService wsSessionService() {
return SpringContextUtil.getBean(WsSessionService.class);
}
public WsSendHelper wsSendHelper() {
return SpringContextUtil.getBean(WsSendHelper.class);
}
public WsReceiveService wsReceiveService() {
return SpringContextUtil.getBean(WsReceiveService.class);
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
log.info("新连接创建,{}!", WsUtil.toStr(session));
//设置最大超时时间--默认30秒
long maxIdleTimeout = 30000;
try {
WsConfig wsConfig = SpringContextUtil.getBean(WsConfig.class);
if (null != wsConfig) {
maxIdleTimeout = wsConfig.getWsSessionTimeout();
}
} catch (Exception e) {
log.warn("获取WS会话最大空闲时间参数异常,使用默认值:{}", maxIdleTimeout, e);
}
session.setMaxIdleTimeout(maxIdleTimeout);
//统计
wsSessionService().getLocal().openIncrement();
log.warn("OPEN 后 WS会话总数:{}", session.getOpenSessions().size());
//TODO 需完善管理OPEN后不处理会话;一段时间内未认证则关闭
}
/**
* 收到客户端消息后调用的方法
* 请求的数据进行响应,响应类型的数据进行解析
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
try {
log.info("B-S:{},{}", message, WsUtil.toStr(session));
JSONObject msgJson = JSON.parseObject(message);
if (null != msgJson) {
String type = msgJson.getString(WS_TYPE_KEY);
if (WsType.req.toString().equals(type)) {
onReqMessage(msgJson, session);
} else if (WsType.resp.toString().equals(type)) {
onRespMessage(msgJson, session);
}
}
} catch (MicroRuntimeException mre) {
log.error("WS onMessage MicroRuntimeException {},{}", message, WsUtil.toStr(session), mre);
WsResp wsResp = buildUnknownWsResp(mre);
wsSendHelper().sendWsResp(session, wsResp);
} catch (JSONException jsone) {
log.error("WS onMessage JSONException(协议错误) {},{}", message, WsUtil.toStr(session), jsone);
MicroRuntimeException microRuntimeException = new MicroRuntimeException(WsError.ws_invalid_msg, jsone);
WsResp wsResp = buildUnknownWsResp(microRuntimeException);
wsSendHelper().sendWsResp(session, wsResp);
} catch (Exception e) {
log.error("WS onMessage Exception {},{}", message, WsUtil.toStr(session), e);
MicroRuntimeException microRuntimeException = new MicroRuntimeException(WsError.ws_error, e);
WsResp wsResp = buildUnknownWsResp(microRuntimeException);
wsSendHelper().sendWsResp(session, wsResp);
}
}
/**
* 发生错误时调用
**/
@OnError
public void onError(Session session, Throwable error) {
log.info("连接异常!,{}", WsUtil.toStr(session), error);
}
/**
* 连接关闭调用的方法,无需调用关闭,调用方后自动触发关闭
*/
@OnClose
public void onClose(Session session) {
String uid = WsUtil.getLoginUid(session);
if (StringUtils.isNotEmpty(uid)) {
wsSessionService().removeAuthSession(uid);
log.info("认证连接关闭,uid:{},{}!", uid, WsUtil.toStr(session));
} else {
log.info("未认证连接关闭,{}!", WsUtil.toStr(session));
}
//统计
wsSessionService().getLocal().closeIncrement();
log.warn("CLOSE 后 WS会话总数:{}", session.getOpenSessions().size());
}
/**
* 接收请求消息
*
* @param msgJson
* @param session
*/
public void onReqMessage(JSONObject msgJson, Session session) {
String action = msgJson.getString(WS_ACTION_KEY);
//获取入参类型
ReqWsActionOp reqWsActionOp = wsActionBeanPostProcessor().req();
Class<?> dataVoClazz = reqWsActionOp.getDataVoClazz(action);
//回调对应实现方法
WsReq<?> wsReq = WsUtil.toWsReq(msgJson, dataVoClazz);
WsResp<?> wsResp = null;
if (!reqWsActionOp.isPub(action) && !WsUtil.isLogin(session)) {
//非公开方法且未登录返回异常,3秒后关闭会话
wsResp = WsResp.create(wsReq, WsError.ws_unauthorized);
} else {
wsResp = reqWsActionOp.invokeWsAction(wsReq, new LocalWsSession(session));
if (wsResp.successful() && WS_LOGIN_ACTION_NAME.equals(wsResp.getAction())) {
//登录处理成功
wsSessionService().addAuthSession(session);
}
}
wsSendHelper().sendWsResp(session, wsReq, wsResp);
}
/**
* 接收响应数据
*
* @param msgJson
* @param session
*/
public void onRespMessage(JSONObject msgJson, Session session) throws Exception {
wsReceiveService().onRespMessage(msgJson, new LocalWsSession(session));
}
/**
* 请求消息转换成WsReq对象
*
* @param message
* @param dataClazz 请求data转换成对象Class
* @param <T>
* @return
*/
public static <T> WsReq<T> toWsReq(String message, Class<T> dataClazz) {
try {
JSONObject object = JSON.parseObject(message);
return WsUtil.toWsReq(object, dataClazz);
} catch (JSONException je) {
throw new MicroRuntimeException(WsError.ws_invalid_msg);
}
}
/**
* 响应消息转换成WsResp对象
*
* @param msgJson
* @param dataClazz 响应data转换成对象Class
* @param <T>
* @return
*/
public static <T> WsResp<T> toWsResp(JSONObject msgJson, Class<T> dataClazz) {
return WsUtil.toWsResp(msgJson, dataClazz);
}
/**
* 构建一个未知响应消息,一般在错误协议是使用
*
* @param code
* @param <T>
* @return
*/
public static <T> WsResp<T> buildUnknownWsResp(IErrorCode code) {
WsReq<?> wsReq = new WsReq<>();
wsReq.setId(UUIDUtils.generate());
wsReq.setType(WsType.resp);
wsReq.setAction(WS_UNKNOWN_ACTION_NAME);
return WsResp.create(wsReq, code);
}
}配置
java
/**
* ws自定义配置<br/>
* 例如:@ServerEndpoint(value = "/ws/edcall.ws",configurator = CommnetServerEndpointConfig.class)<br/>
* 指定自定义配置后的WsSession对象中可以获取到请求端IP
*
* @author Brack.zhu
* @date 2020/9/15
*/
public class CommnetServerEndpointConfig extends DefaultServerEndpointConfigurator {
/**
* HTTP 头数据 获取请求方IP
*/
String HEADER_REAL_IP = "X-Real-IP";
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
super.modifyHandshake(sec, request, response);
String ip=request.getHeaders().get(HEADER_REAL_IP).toString();
sec.getUserProperties().put(WsUtil.KEY_HTTP_IP, ip);
}
}封装获取到的请求消息的实体类
java
import com.commnetsoft.ws.annotation.WsAction;
import com.commnetsoft.ws.annotation.WsType;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
/**
* 监听WebSocket协议业务处理组件
*
* @author Brack.zhu
* @date 2019/12/3
*/
@Component
public class WsActionBeanPostProcessor implements BeanPostProcessor {
private static ReqWsActionOp reqWsActionOp=new ReqWsActionOp();
private static RespWsActionOp respWsActionOp=new RespWsActionOp();
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
for (Method method : methods) {
WsAction wsAction = AnnotationUtils.findAnnotation(method, WsAction.class);
if (wsAction == null) {
continue;
}
String actionVal = wsAction.action();
WsType wsType=wsAction.type();
WsActionOpData wsActionOpData = new WsActionOpData();
wsActionOpData.setBean(bean);
wsActionOpData.setMethod(method);
wsActionOpData.setWsAction(wsAction);
if(WsType.req.equals(wsType)){
reqWsActionOp.put(actionVal, wsActionOpData);
}else if(WsType.resp.equals(wsType)){
respWsActionOp.put(actionVal,wsActionOpData);
}
}
return bean;
}
public ReqWsActionOp req(){
return reqWsActionOp;
}
public RespWsActionOp resp(){
return respWsActionOp;
}
}数据实体类
java
import com.commnetsoft.ws.annotation.WsAction;
import java.lang.reflect.Method;
/**
* WsActionOp相应操作对象
* @author Brack.zhu
* @date 2019/12/10
*/
public class WsActionOpData {
WsAction wsAction;
Object bean;
Method method;
public WsAction getWsAction() {
return wsAction;
}
public void setWsAction(WsAction wsAction) {
this.wsAction = wsAction;
}
public Object getBean() {
return bean;
}
public void setBean(Object bean) {
this.bean = bean;
}
public Method getMethod() {
return method;
}
public void setMethod(Method method) {
this.method = method;
}
}封装处理数据的抽象类
由该类实现 获取数据、获取权限、调用对应方法、对数据封装、熔断。
java
import com.commnetsoft.commons.Result;
import com.commnetsoft.exception.MicroRuntimeException;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.commnetsoft.ws.WsError;
import com.commnetsoft.ws.annotation.WsPermit;
import com.commnetsoft.ws.model.WsSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
/**
* @author Brack.zhu
* @date 2019/12/10
*/
public abstract class AbstractWsActionOp {
private transient Logger log = LoggerFactory.getLogger(AbstractWsActionOp.class);
/**
* 获取请求类型指定WsActionOp对象
*
* @param wsAction
* @return
*/
public abstract WsActionOpData getWsActionOpData(String wsAction);
/**
* 获取指定WsAction对应的接口权限
*
* @param wsAction
* @return
*/
public WsPermit getWsPermit(String wsAction) {
WsActionOpData wsActionOpData = getWsActionOpData(wsAction);
if (null != wsActionOpData) {
return wsActionOpData.getWsAction().permit();
}
return null;
}
/**
* 指定WsAction对应的接口权限是否公开
*
* @param wsAction
* @return true 公开
*/
public boolean isPub(String wsAction) {
WsPermit apiPermit = getWsPermit(wsAction);
if (null != apiPermit) {
return WsPermit.PUBLIC.equals(apiPermit) ? true : false;
}
return false;
}
/**
* 获取反射方法形参类型数组
*
* @param wsAction
* @return
*/
public Class<?>[] getArgsClazz(String wsAction) {
WsActionOpData wsActionOpData = getWsActionOpData(wsAction);
if (null != wsActionOpData) {
return wsActionOpData.getMethod().getParameterTypes();
}
return new Class<?>[]{};
}
/**
* 获取反射方法形参类型数组,获取失败使用默认Void
*
* @param wsAction
* @return
*/
public Class<?> getDataVoClazz(String wsAction) {
Class<?> clazz=getDataVoClazzNull(wsAction);
if(null==clazz){
return Void.class;
}
return clazz;
}
/**
* 获取反射方法形参类型数组,获取失败使用默认Void
*
* @param wsAction
* @return
*/
public Class<?> getDataVoClazzNull(String wsAction) {
Class<?>[] argsClazz = getArgsClazz(wsAction);
if (null != argsClazz && argsClazz.length >= 1) {
Class<?> dataVoClazz = argsClazz[0];
if (!WsSession.class.isAssignableFrom(dataVoClazz)) {
return dataVoClazz;
}
}
return null;
}
/**
* 反射对应方法
* @param data
* @param wsSession
* @param action
* @param <T>
* @param <D>
* @return
*/
public <T, D> Result<T> invokeWsActionMethod(D data, WsSession wsSession, String action) {
Result<T> rs = null;
try {
WsActionOpData wsActionOp = getWsActionOpData(action);
if(null!=wsActionOp){
int parsLength = wsActionOp.getMethod().getParameterTypes().length;
if (1 == parsLength) {
rs = (Result<T>) wsActionOp.getMethod().invoke(wsActionOp.getBean(), wsSession);
} else if (2 == parsLength) {
rs = (Result<T>) wsActionOp.getMethod().invoke(wsActionOp.getBean(), data, wsSession);
} else {
log.error("WsAction 处理方法反射异常,实现类形参定义错误({}),{},{}", parsLength, data, wsSession);
return Result.create(WsError.ws_action_error);
}
}else{
log.error("WsAction 处理方法未找到{},{}", data, wsSession);
return Result.create(WsError.ws_action_notfound);
}
} catch (MicroRuntimeException mre) {
log.error("WsAction 处理方法反射异常,{},{}", data, wsSession, mre);
return Result.create(mre);
} catch (Exception e) {
log.error("WsAction 处理方法反射异常,{},{}", data, wsSession, e);
return Result.create(WsError.ws_action_error);
}
return rs;
}
/**
* 反射对应失败熔断方法
* @param data
* @param wsSession
* @param action
* @param <D>
* @return
*/
public <D> void invokeWsActionFallbackMethod(D data, WsSession wsSession, String action) {
try {
WsActionOpData wsActionOp = getWsActionOpData(action);
if(null!=wsActionOp){
Class<?> fallbackClazz=wsActionOp.getWsAction().fallback();
if(null==fallbackClazz){
return ;
}
Object fallbackBean=SpringContextUtil.getBean(fallbackClazz);
if(null==fallbackBean){
return;
}
Method method=wsActionOp.getMethod();
Class<?>[] paraClass= method.getParameterTypes();
Method fallbackMethod=fallbackClazz.getMethod(method.getName(),paraClass);
if(null==fallbackMethod){
return;
}
Result<?> rs = null;
int parsLength = paraClass.length;
if (1 == parsLength) {
rs=(Result<?>)fallbackMethod.invoke(fallbackBean,wsSession);
} else if (2 == parsLength) {
rs = (Result<?>) fallbackMethod.invoke(fallbackBean, data, wsSession);
} else {
log.error("WsAction 处理失败熔断方法反射异常,实现类形参定义错误({}),{},{}", parsLength, data, wsSession);
}
if(!rs.successful()) {
log.error("WsAction 处理失败熔断方法反射结果:{},{},{}",rs,data,wsSession);
}
}else{
log.error("WsAction 处理失败熔断方法未找到{},{}", data, wsSession);
}
} catch (MicroRuntimeException mre) {
log.error("WsAction 处理失败熔断方法反射异常,{},{}", data, wsSession, mre);
} catch (Exception e) {
log.error("WsAction 处理失败熔断方法反射异常,{},{}", data, wsSession, e);
}
}
}请求方式实现类
java
import com.commnetsoft.commons.Result;
import com.commnetsoft.ws.model.WsReq;
import com.commnetsoft.ws.model.WsResp;
import com.commnetsoft.ws.model.WsSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 请求类型操作
* @author Brack.zhu
* @date 2019/12/10
*/
public class ReqWsActionOp extends AbstractWsActionOp {
private transient Logger log = LoggerFactory.getLogger(ReqWsActionOp.class);
/**
* WsAction 请求类型相关数据
*/
private final transient Map<String, WsActionOpData> wsReqActions = new ConcurrentHashMap<>(256);
public WsActionOpData put(String wsAction, WsActionOpData wsActionOpData) {
return wsReqActions.put(wsAction, wsActionOpData);
}
public WsActionOpData remove(String wsAction) {
return wsReqActions.remove(wsAction);
}
@Override
public WsActionOpData getWsActionOpData(String wsAction) {
return wsReqActions.get(wsAction);
}
/**
* 调用WsAction对应方法
* @param req
* @param wsSession
* @param <T>
* @param <V>
* @return
*/
public <T,V> WsResp<T> invokeWsAction(WsReq<V> req, WsSession wsSession){
String action = req.getAction();
V reqData = req.getData();
Result<T> rs=invokeWsActionMethod(reqData,wsSession,action);
if(!rs.successful()){
log.error("WsAction 处理方法反射错误{},{},{}",req,wsSession,rs);
}
return WsResp.create(req,rs);
}
/**
* 调用WsAction对应失败熔断方法
* @param req
* @param wsSession
* @param <V>
* @return
*/
public <V> void invokeWsActionFallback(WsReq<V> req, WsSession wsSession){
String action = req.getAction();
V reqData = req.getData();
invokeWsActionFallbackMethod(reqData,wsSession,action);
}
}响应方式实现
java
import com.commnetsoft.commons.Result;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.commnetsoft.ws.model.WsResp;
import com.commnetsoft.ws.model.WsSession;
import com.commnetsoft.ws.service.WsReceiveService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 响应类型操作
* @author Brack.zhu
* @date 2019/12/10
*/
public class RespWsActionOp extends AbstractWsActionOp {
private transient Logger log = LoggerFactory.getLogger(RespWsActionOp.class);
/**
* WsAction 响应类型相关数据
*/
private final transient Map<String, WsActionOpData> wsRespActions = new ConcurrentHashMap<>(256);
public WsActionOpData put(String wsAction, WsActionOpData wsActionOpData) {
return wsRespActions.put(wsAction, wsActionOpData);
}
public WsActionOpData remove(String wsAction) {
return wsRespActions.remove(wsAction);
}
@Override
public WsActionOpData getWsActionOpData(String wsAction) {
return wsRespActions.get(wsAction);
}
/**
* 调用WsAction对应方法
* @param resp
* @param wsSession
* @param <T>
* @param <V>
*/
public <T,V> void invokeWsAction(WsResp<V> resp, WsSession wsSession){
String action = resp.getAction();
Result<T> rs=invokeWsActionMethod(resp,wsSession,action);
if(!rs.successful()){
log.error("WsAction 处理方法反射错误{},{},{}",resp,wsSession,rs);
}
}
/**
* 通用实现响应
* 响应对请求响应锁减一 唤醒响应线程
* @param <V>
* @param resp
* @return
*/
public <V> void generalRespAction(WsResp<V> resp) {
WsReceiveService wsReceiveService= SpringContextUtil.getBean(WsReceiveService.class);
wsReceiveService.respCountDownLatch(resp);
}
/**
* 重写
*/
@Override
public <T, D> Result<T> invokeWsActionMethod(D data, WsSession wsSession,String action) {
WsActionOpData wsActionOp = getWsActionOpData(action);
Result<T> result=null;
if(null!=wsActionOp) {
//自定义
WsResp<?> resp=( WsResp<?>)data;
result=super.invokeWsActionMethod(resp.getReqdata(),wsSession, action);
}
//使用通用实现
generalRespAction((WsResp<?>)data);
if(null==result){
result=Result.create();
}
return result;
}
/**
* 调用WsAction对应失败熔断方法
* @param resp
* @param wsSession
* @param <V>
* @return
*/
public <V> void invokeWsActionFallback(WsResp<V> resp, WsSession wsSession){
String action = resp.getAction();
V respData = resp.getResult();
invokeWsActionFallbackMethod(respData,wsSession,action);
}
}WS消息发送响应结果处理器
java
import com.alibaba.fastjson.JSON;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.commnetsoft.ws.model.LocalWsSession;
import com.commnetsoft.ws.model.WsReq;
import com.commnetsoft.ws.model.WsResp;
import com.commnetsoft.ws.model.WsSession;
import com.commnetsoft.ws.service.WsSessionService;
import com.commnetsoft.ws.util.WsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.websocket.SendHandler;
import javax.websocket.SendResult;
import javax.websocket.Session;
/**
* WS消息发送响应结果处理器
* @author Brack.zhu
* @date 2019/12/10
*/
public class WsSendHandler<T,V> implements SendHandler {
private WsReq<T> req;
private WsResp<V> resp;
private Session session;
private static Logger log = LoggerFactory.getLogger(WsSendHandler.class);
private WsSendHandler() {
}
public WsSendHandler(WsReq<T> req,WsResp<V> resp, Session session) {
this.req=req;
this.resp=resp;
this.session=session;
}
@Override
public void onResult(SendResult result) {
if(result.isOK()) {
//成功
log.debug("S-B:{},{}", JSON.toJSONString(resp), WsUtil.toStr(session));
}else {
//失败熔断
log.debug("S-B:失败-req:{},resp:{},{}",JSON.toJSONString(req),JSON.toJSONString(resp), WsUtil.toStr(session));
WsActionBeanPostProcessor wsActionBeanPostProcessor=SpringContextUtil.getBean(WsActionBeanPostProcessor.class);
if(null==wsActionBeanPostProcessor){
log.warn("WsActionBeanPostProcessor 对象获取失败!");
return;
}
String uid=WsUtil.getLoginUid(session);
WsSessionService wsSessionService=SpringContextUtil.getBean(WsSessionService.class);
WsSession wsSession=wsSessionService.getLocal().getSession(uid);
if(null==wsSession){
wsSession=new LocalWsSession(session);
}
wsActionBeanPostProcessor.req().invokeWsActionFallback(req,wsSession);
}
}
}会话管理
会话相关实体类
消息接口
java
/**
* @author Brack.zhu
* @date 2020/12/14
*/
public interface WsMsg {
}请求消息模型
java
import com.commnetsoft.ws.annotation.WsType;
/**
* 请求消息模型
* @author Brack.zhu
* @date 2019/12/4
*/
public class WsReq<T> implements WsMsg {
private String id;
private WsType type = WsType.req;
private String action;
private T data;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
public WsType getType() {
return type;
}
public void setType(WsType type) {
this.type = type;
}
}响应消息模型
java
import com.commnetsoft.commons.IErrorCode;
import com.commnetsoft.commons.Result;
import com.commnetsoft.ws.annotation.WsType;
import java.util.Objects;
/**
* 响应消息模型
* @author Brack.zhu
* @date 2019/12/4
*/
public class WsResp<T> extends Result<T> implements WsMsg {
private String id;
private WsType type= WsType.resp;
private String action;
private Object reqdata;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public Object getReqdata() {
return reqdata;
}
public void setReqdata(Object reqdata) {
this.reqdata = reqdata;
}
public WsType getType() {
return type;
}
public void setType(WsType type) {
this.type = type;
}
/**
* 创建一个正确的数据返回结果
*
* @param result 数据
*/
public static <T,V> WsResp<T> create(WsReq<V> req,Result<T> result) {
WsResp<T> rt = new WsResp<>();
rt.setId(req.getId());
rt.setAction(req.getAction());
rt.setCode(result.getCode());
rt.setMessage(result.getMessage());
rt.setDesc(result.getDesc());
rt.setResult(result.getResult());
return rt;
}
/**
* 创建一个错误的返回结果
*
* @param code 错误信息
*/
public static <T, V> WsResp<T> create(WsReq<V> req, IErrorCode code) {
Objects.requireNonNull(code, "错误的返回结果错误码不能为空");
WsResp<T> rt = new WsResp<>();
rt.setCode(code.getCode());
rt.setMessage(code.getMessage());
rt.setDesc(code.getDesc());
rt.setId(req.getId());
rt.setAction(req.getAction());
rt.setReqdata(req.getData());
return rt;
}
}请求响应闭锁
继承CountDownLatch,构造器传入一个值,当该值为0时其中方法才会执行。
即请求发出后,得到响应后值会减一,然后执行方法。若一直未响应则有超时时间
java
import com.commnetsoft.core.utils.SpringContextUtil;
import com.commnetsoft.ws.WsConfig;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 请求响应闭锁
* @author Brack.zhu
* @date 2020年11月19日
*/
public class ReqRespCountDownLatch extends CountDownLatch {
private WsResp<?> resp;
private Class<?> respDataType;
public ReqRespCountDownLatch(Class<?> respDataType) {
super(1);
this.respDataType=respDataType;
}
/**
* 同步等待请求响应
* @return
*/
@SuppressWarnings("unchecked")
public <T> WsResp<T> awaitResp() {
try {
WsConfig wsConfig = SpringContextUtil.getBean(WsConfig.class);
long awaitTime= wsConfig.getWsSyncSendTimeout();
super.await(awaitTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
//ignore
}
return (WsResp<T>)getResp();
}
public WsResp<?> getResp() {
return resp;
}
public void setResp(WsResp<?> resp) {
this.resp = resp;
}
public Class<?> getRespDataType() {
return respDataType;
}
}session接口
java
import com.commnetsoft.ws.service.comm.CommunicateType;
/**
* WS模块会话对象
*
* @author Brack.zhu
* @date 2020/12/24
*/
public abstract class WsSession {
private CommunicateType type;
private String uid;
public WsSession(CommunicateType type) {
this.type = type;
}
/**
* 是否认证成功的
*
* @return
*/
public abstract boolean isAuthed();
public CommunicateType getType() {
return type;
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
}本地session
java
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.ws.service.comm.CommunicateType;
import com.commnetsoft.ws.util.WsUtil;
import javax.websocket.Session;
/**
* 本地会话对象
* @author Brack.zhu
* @date 2020/12/24
*/
public class LocalWsSession extends WsSession {
private Session session;
public LocalWsSession(Session session) {
super(CommunicateType.Local);
this.session = session;
String uid = WsUtil.getLoginUid(session);
super.setUid(uid);
}
public Session getSession() {
return session;
}
@Override
public boolean isAuthed() {
return StringUtils.isNotBlank(getUid()) ? true : false;
}
}集群session
java
import com.commnetsoft.commons.utils.EncryptUtils;
import com.commnetsoft.ws.service.comm.CommunicateType;
import java.math.BigInteger;
/**
* 集群会话对象<br/>
*
* @author Brack.zhu
* @date 2020/12/24
*/
public class ClusterWsSession extends WsSession {
/**
* 用户连接所在服务器实例id
*/
private String serverInstanceId;
public ClusterWsSession(String uid,String serverInstanceId) {
super(CommunicateType.Cluster);
setUid(uid);
setServerInstanceId(serverInstanceId);
}
@Override
public boolean isAuthed() {
return true;
}
public String getServerInstanceId() {
return serverInstanceId;
}
public void setServerInstanceId(String serverInstanceId) {
this.serverInstanceId = serverInstanceId;
}
@Override
public int hashCode() {
String str = this.getUid() + this.getServerInstanceId();
String md5 = EncryptUtils.MD5.encrypt(str);
BigInteger bigInteger = new BigInteger(md5.getBytes());
return bigInteger.intValue();
}
@Override
public boolean equals(Object obj) {
if(obj instanceof ClusterWsSession){
ClusterWsSession clusterWsSession=(ClusterWsSession)obj;
String objStr = clusterWsSession.getUid() + clusterWsSession.getServerInstanceId();
String str = this.getUid() + this.getServerInstanceId();
if(objStr.equals(str)){
return true;
}
}
return false;
}
}Session管理
包含增加、获取、移除会话的方法。本地+集群
java
import com.commnetsoft.core.utils.ApplicationUtil;
import com.commnetsoft.ws.model.ClusterWsSession;
import com.commnetsoft.ws.model.LocalWsSession;
import com.commnetsoft.ws.model.WsSession;
import com.commnetsoft.ws.service.session.ClusterWsSessionService;
import com.commnetsoft.ws.service.session.LocalWsSessionService;
import com.commnetsoft.ws.util.WsUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.websocket.Session;
/**
* WS会话管理服务
*
* @author Brack.zhu
* @date 2019/12/4
*/
@Service
public class WsSessionService {
@Autowired
private LocalWsSessionService localWsSessionService;
@Autowired
private ClusterWsSessionService clusterWsSessionService;
/**
* 增加验证成功会话
*
* @return
*/
public boolean addAuthSession(Session session) {
if (localWsSessionService.addSession(new LocalWsSession(session))) {
clusterWsSessionService.addSession(new ClusterWsSession(WsUtil.getLoginUid(session), ApplicationUtil.getInstanceId()));
return true;
}
return false;
}
/**
* 移除指定用户验证会话对象
*
* @param uid
* @return
*/
public WsSession removeAuthSession(String uid) {
WsSession localWsSession = localWsSessionService.removeSession(uid);
WsSession clusterWsSession = clusterWsSessionService.removeSession(uid);
return null != localWsSession ? localWsSession : clusterWsSession;
}
/**
* 获取指定用户验证会话对象
*
* @param uid
* @return
*/
public WsSession getAuthSession(String uid) {
WsSession wsSession = localWsSessionService.getSession(uid);
if (null != wsSession) {
return wsSession;
}
return clusterWsSessionService.getSession(uid);
}
public LocalWsSessionService getLocal() {
return localWsSessionService;
}
public ClusterWsSessionService getCluster() {
return clusterWsSessionService;
}
}Session管理抽象类
java
import com.commnetsoft.ws.model.WsSession;
/**
* 会话管理抽象类
* @author Brack.zhu
* @date 2020/12/28
*/
public abstract class AbstractWsSessionService {
/**
* 增加会话
* @param wsSession
* @return
*/
public abstract boolean addSession(WsSession wsSession);
/**
* 获取指定会话
* @param uid
* @return
*/
public abstract WsSession getSession(String uid);
/**
* 删除指定会话
* @param uid
* @return
*/
public abstract WsSession removeSession(String uid);
}本地session管理
添加成功后通过mq发送到集群添加集群session
java
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.core.mq.AmqpHelper;
import com.commnetsoft.ws.WsSessionEvents;
import com.commnetsoft.ws.WsSessionType;
import com.commnetsoft.ws.model.LocalWsSession;
import com.commnetsoft.ws.model.WsSession;
import com.commnetsoft.ws.model.WsSessionChangeMsg;
import com.commnetsoft.ws.util.WsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* 本地会话管理实现
* @author Brack.zhu
* @date 2020/12/28
*/
@Component
public class LocalWsSessionService extends AbstractWsSessionService {
/**
* 认证会话关系<br/>
* 网络IO对象只能存储在本地
* key:uid
*/
private static ConcurrentHashMap<String, LocalWsSession> authUidSessionMap = new ConcurrentHashMap();
/**
* 认证会话关系<br/>
* key:id
* val:uid
*/
private static ConcurrentHashMap<String, String> authIdUidMap = new ConcurrentHashMap();
/**
* 打开会话总数
*/
private AtomicLong openCount = new AtomicLong(0);
/**
* 关闭会话总数
*/
private AtomicLong closeCount = new AtomicLong(0);
@Autowired
AmqpHelper amqpHelper;
private static Logger log = LoggerFactory.getLogger(LocalWsSessionService.class);
/**
* 会话打开统计数增加1
*
* @return
*/
public long openIncrement() {
long rs = openCount.incrementAndGet();
//统计日志输出
sessionInfo2Log();
return rs;
}
/**
* 获取会话打开统计数
*
* @return
*/
public long getOpenCount() {
return openCount.get();
}
/**
* 会话关闭统计数增加1
*
* @return
*/
public long closeIncrement() {
long rs = closeCount.incrementAndGet();
//统计日志输出
sessionInfo2Log();
return rs;
}
/**
* 获取关闭会话统计数
*
* @return
*/
public long getCloseCount() {
return closeCount.get();
}
/**
* 获取认证成功的会话数
*
* @return
*/
public long getAuthCount() {
return authUidSessionMap.mappingCount();
}
@Override
public synchronized boolean addSession(WsSession wsSession) {
LocalWsSession localWsSession=(LocalWsSession)wsSession;
String uid = wsSession.getUid();
if (StringUtils.isNotEmpty(uid)) {
log.info("增加认证成功会话:{},{}", uid, wsSession);
//是否存在老的会话
LocalWsSession oldSession = (LocalWsSession)getSession(uid);
if (null != oldSession && !localWsSession.getSession().equals(oldSession.getSession())) {
removeSession(uid);
log.warn("{}新认证Ws会话加入{},强制关闭老的会话:{}", uid, wsSession, oldSession);
WsUtil.closeSession(oldSession.getSession());
}
//认证对象
authUidSessionMap.put(uid, localWsSession);
String sessionId=localWsSession.getSession().getId();
authIdUidMap.put(sessionId, uid);
//发送WS会话有效消息
WsSessionChangeMsg wsSessionChangeMsg = new WsSessionChangeMsg();
wsSessionChangeMsg.setType(WsSessionType.VALID);
wsSessionChangeMsg.setId(sessionId);
wsSessionChangeMsg.setIdmId(uid);
amqpHelper.sendToExchange(WsSessionEvents.WS_SESSION_CHANGE, wsSessionChangeMsg);
//统计日志输出
sessionInfo2Log();
return true;
} else {
log.error("WsSession无用户UID信息,强制关闭会话:{}", wsSession);
WsUtil.closeSession(localWsSession.getSession());
}
return false;
}
public WsSession getSessionById(String sessionId) {
String uid = authIdUidMap.get(sessionId);
if (StringUtils.isBlank(uid)) {
return null;
}
return getSession(uid);
}
@Override
public WsSession getSession(String uid) {
return authUidSessionMap.get(uid);
}
@Override
public synchronized WsSession removeSession(String uid) {
LocalWsSession session = authUidSessionMap.remove(uid);
if (null == session) {
return null;
}
authIdUidMap.remove(session.getSession().getId());
//发送WS会话无效消息
WsSessionChangeMsg wsSessionChangeMsg = new WsSessionChangeMsg();
wsSessionChangeMsg.setType(WsSessionType.INVALID);
wsSessionChangeMsg.setId(session.getSession().getId());
wsSessionChangeMsg.setIdmId(uid);
amqpHelper.sendToExchange(WsSessionEvents.WS_SESSION_CHANGE, wsSessionChangeMsg);
return session;
}
/**
* 会话统计输出到日志信息
*
* @return
*/
public void sessionInfo2Log() {
log.info("WS 会话当前统计 open:{},close:{},atuh:{}", getOpenCount(), getCloseCount(), getAuthCount());
}
}集群session服务 通过一定规则构建key,并将key和服务的instanceid存入redis
获取通过key获取instanceid
java
import com.commnetsoft.core.CoreConstant;
import com.commnetsoft.core.utils.ApplicationUtil;
import com.commnetsoft.ws.WsConfig;
import com.commnetsoft.ws.model.ClusterWsSession;
import com.commnetsoft.ws.model.WsSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @author Brack.zhu
* @date 2020/12/28
*/
@Component
public class ClusterWsSessionService extends AbstractWsSessionService {
@Value("#{coreConfig.applicationName}")
private String applicationName;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private WsConfig wsConfig;
/**
* 缓存有效期延长时间--毫秒
*/
private long cacheExpireLengthen = 30000;
/**
* 本地保存到集群缓存的用户集合
* key:uid
*/
private static Set<String> clusterUidsSets = Collections.synchronizedSet(new HashSet<>());
private static Logger log = LoggerFactory.getLogger(ClusterWsSessionService.class);
@PostConstruct
public void postConstruct() {
//启动集群会话维持看门狗
new Thread("ClusterWsSessionServiceLockWatchDogThread") {
@Override
public void run() {
while (true) {
try {
long wsClusterSessionKeepInterval=wsConfig.getWsClusterSessionKeepInterval();
Thread.sleep(wsClusterSessionKeepInterval);
} catch (InterruptedException e) {
//ignore
}
long start = System.currentTimeMillis();
extensionAllClusterSessionCacheExpire();
long end = System.currentTimeMillis();
long opTime = end - start;
if (opTime > cacheExpireLengthen) {
log.warn("缓存延续操作时间大于默认缓存有效期延长时间,为防止缓存被回收将cacheExpireLengthen({})值更新为:{}", cacheExpireLengthen, opTime);
cacheExpireLengthen = opTime;
}
}
}
}.start();
}
@Override
public boolean addSession(WsSession wsSession) {
String userId = wsSession.getUid();
if (addClusterSession(userId)) {
clusterUidsSets.add(userId);
return true;
} else {
return false;
}
}
@Override
public WsSession getSession(String uid) {
String clusterCacheKey = buildClusterSessionKey(uid);
try {
Object valObj = redisTemplate.opsForValue().get(clusterCacheKey);
if(null!=valObj){
ClusterWsSession wsSession = new ClusterWsSession(uid,String.valueOf(valObj));
return wsSession;
}
} catch (Exception e) {
log.error("获取WS集群环境中的会话连接实例信息异常,{}", clusterCacheKey, e);
}
return null;
}
@Override
public WsSession removeSession(String uid) {
clusterUidsSets.remove(uid);
WsSession wsSession = getSession(uid);
if (null != wsSession) {
String clusterCacheKey = buildClusterSessionKey(uid);
try {
redisTemplate.delete(clusterCacheKey);
} catch (Exception e) {
log.error("移除集群环境缓存中的WS会话连接实例信息异常,{}", clusterCacheKey, e);
}
}
return wsSession;
}
/**
* 构建WS会话连接信息集群缓存KEY
*
* @param userid
* @return
*/
public String buildClusterSessionKey(String userid) {
//key:项目名:wsClusterSessionInstance:userid:
String mark = CoreConstant.Cache.REDIS_KEY_MARK;
return applicationName + mark + "wsClusterSessionInstance" + mark + userid;
}
/**
* 获取集群回去缓存有效期--单位毫秒
*
* @return
*/
public long getClusterSessionCacheExpire() {
return wsConfig.getWsClusterSessionKeepInterval() + cacheExpireLengthen;
}
/**
* 延续所有集群会话缓存有效期
*/
private void extensionAllClusterSessionCacheExpire() {
Iterator<String> uids = clusterUidsSets.iterator();
while (uids.hasNext()) {
String uid = uids.next();
addClusterSession(uid);
}
}
/**
* 将WS会话连接实例信息增加到集群环境缓存中
*
* @param userid
*/
private boolean addClusterSession(String userid) {
String clusterCacheKey = buildClusterSessionKey(userid);
try {
redisTemplate.opsForValue().set(clusterCacheKey, ApplicationUtil.getInstanceId());
//设置有效期---集群会话看门狗维持
redisTemplate.expire(clusterCacheKey, getClusterSessionCacheExpire(), TimeUnit.MILLISECONDS);
return true;
} catch (Exception e) {
log.error("将WS会话连接对应实例信息增加到集群环境缓存中异常,{},{}", clusterCacheKey, ApplicationUtil.getInstanceId(), e);
}
return false;
}
}消息发送服务类
消息分发器,获取要发送消息的类型,发送普通消息、同步消息、响应消息及控制发送同步消息时接收到响应时取消线程阻塞
java
import com.commnetsoft.exception.MicroRuntimeException;
import com.commnetsoft.ws.WsConfig;
import com.commnetsoft.ws.WsError;
import com.commnetsoft.ws.model.*;
import com.commnetsoft.ws.service.WsSessionService;
import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.locks.ReentrantLock;
/**
* WS消息分发器
*
* @author Brack.zhu
* @date 2020/12/9
*/
@Component
public class WsDispatch {
@Autowired
private LocalWsCommunicate localWsCommunicate;
@Autowired
private ClusterWsCommunicate clusterWsCommunicate;
@Autowired
private WsSessionService wsSessionService;
@Autowired
private WsConfig wsConfig;
private Logger log = LoggerFactory.getLogger(WsDispatch.class);
/**
* 被动超时MAP---没有线程主动清除超时对象,只有调用相关方法时被动超时清除-
* 没有线程同步
*/
private final ReentrantLock cdlLock = new ReentrantLock();
/**
* key:msgid
*/
private PassiveExpiringMap<String, ReqRespCountDownLatch> reqRespCdls;
@PostConstruct
public void postConstruct() {
long wsSyncSendTimeout = wsConfig.getWsSyncSendTimeout();
cdlLock.lock();
try {
long timeToLiveMillis = wsSyncSendTimeout + 30000;
log.info("请求响应消息CDL闭锁关系MAP有效期时间调整为:{}",timeToLiveMillis);
PassiveExpiringMap<String, ReqRespCountDownLatch> reqRespCDLsTemp = new PassiveExpiringMap<>(timeToLiveMillis);
if (null != reqRespCdls) {
reqRespCDLsTemp.putAll(reqRespCdls);
log.info("请求响应消息CDL闭锁关系MAP迁移数:{}",reqRespCdls.size());
}
reqRespCdls = reqRespCDLsTemp;
} catch (Exception e) {
log.error("", e);
} finally {
cdlLock.unlock();
}
}
public ReqRespCountDownLatch getReqRespCdl(String key) {
return reqRespCdls.get(key);
}
public void putReqRespCdl(String key, ReqRespCountDownLatch reqRespCdl) {
cdlLock.lock();
try {
reqRespCdls.put(key, reqRespCdl);
} catch (Exception e) {
log.error("", e);
} finally {
cdlLock.unlock();
}
}
public ReqRespCountDownLatch removeReqRespCdl(String key) {
cdlLock.lock();
try {
return reqRespCdls.remove(key);
} catch (Exception e) {
log.error("", e);
} finally {
cdlLock.unlock();
}
return null;
}
/**
* 参数变更事件监听
*
* @param envChangeEvent
*/
@EventListener
public void EnvironmentChangeEventListener(EnvironmentChangeEvent envChangeEvent) {
if (envChangeEvent.getKeys().contains("ws.sync.send.timeout")) {
postConstruct();
}
}
/**
* 响应请求
* 将请求响应闭锁线程推出阻塞
* 根据id唤醒同步请求并返回结果
*
* @param resp
* @param <V>
*/
public <V> void respCountDownLatch(WsResp<V> resp) {
ReqRespCountDownLatch cdl = removeReqRespCdl(resp.getId());
if (null != cdl) {
cdl.setResp(resp);
cdl.countDown();
}
}
/**
* 发送请求消息<br/>
* 支持集群
*
* @param uid 用户uid
* @param req 请求消息
* @param <T> 请求消息Data 模型
* @throws MicroRuntimeException
*/
public <T> void sendWsReq(String uid, WsReq<T> req) throws MicroRuntimeException {
CommunicateType commType = getSessionCommType(uid);
if (CommunicateType.Local.equals(commType)) {
localWsCommunicate.sendWsReq(uid, req);
} else if (CommunicateType.Cluster.equals(commType)) {
clusterWsCommunicate.sendWsReq(uid, req);
} else {
throw new MicroRuntimeException(WsError.ws_error, "无效通讯类型" + commType);
}
}
/**
* 发送同步请求消息<br/>
*支持集群
* @param uid
* @param req
* @param <T>
* @throws MicroRuntimeException
*/
public <T, R> WsResp<R> sendAndReceiveWsReq(String uid, WsReq<T> req, Class<R> respDataType) throws MicroRuntimeException {
CommunicateType commType = getSessionCommType(uid);
if (CommunicateType.Local.equals(commType)) {
ReqRespCountDownLatch cdl = new ReqRespCountDownLatch(respDataType);
String id = req.getId();
putReqRespCdl(id, cdl);
localWsCommunicate.sendWsReq(uid, req);
return cdl.awaitResp();
} else if (CommunicateType.Cluster.equals(commType)) {
ReqRespCountDownLatch cdl = new ReqRespCountDownLatch(respDataType);
String id = req.getId();
putReqRespCdl(id, cdl);
clusterWsCommunicate.sendWsReq(uid, req);
return cdl.awaitResp();
} else {
throw new MicroRuntimeException(WsError.ws_session_notfound, "无效通讯类型" + commType);
}
}
/**
* 发送响应消息<br/>
* 不支持集群发送响应消息
*
* @param uid
* @param resp
* @param <T>
* @throws MicroRuntimeException
*/
public <T> void sendWsResp(String uid, WsResp<T> resp) throws MicroRuntimeException {
localWsCommunicate.sendWsResp(uid, resp);
}
/**
* 获取会话通讯类型
*
* @param uid
* @return
*/
public CommunicateType getSessionCommType(String uid) {
WsSession session = wsSessionService.getAuthSession(uid);
if (session instanceof LocalWsSession) {
return CommunicateType.Local;
} else if (session instanceof ClusterWsSession) {
return CommunicateType.Cluster;
}
throw new MicroRuntimeException(WsError.ws_session_notfound);
}
public LocalWsCommunicate getLocal() {
return localWsCommunicate;
}
public ClusterWsCommunicate getCluster() {
return clusterWsCommunicate;
}
}服务类型枚举 本地/集群
java
/**
* @author Brack.zhu
* @date 2020/12/24
*/
public enum CommunicateType {
Local,
Cluster,
;
}本地通讯服务类
本地会话的获取和发送消息
java
import com.alibaba.fastjson.JSON;
import com.commnetsoft.exception.MicroRuntimeException;
import com.commnetsoft.ws.WsError;
import com.commnetsoft.ws.model.*;
import com.commnetsoft.ws.service.WsSessionService;
import com.commnetsoft.ws.util.WsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.Session;
/**
* WS本地通讯
*
* @author Brack.zhu
* @date 2020/12/9
*/
@Component
public class LocalWsCommunicate implements IWsCommunicate {
@Autowired
private WsSessionService wsSessionService;
private Logger log = LoggerFactory.getLogger(LocalWsCommunicate.class);
@Override
public <T> void sendWsReq(String uid, WsReq<T> wsReq) throws MicroRuntimeException {
Session session = getSession(uid);
sendWsReq(session, wsReq);
}
public <T> void sendWsReq(Session session, WsReq<T> wsReq) throws MicroRuntimeException {
if (null == wsReq) {
throw new MicroRuntimeException(WsError.ws_invalid_msg);
}
WsUtil.checkSession(session);
try {
String text = JSON.toJSONString(wsReq);
WsUtil.sendText(session, text);
} catch (Exception e) {
log.error("S-B 请求消息异常:{},{},{}", WsUtil.getLoginUid(session), session, wsReq, e);
throw new MicroRuntimeException(WsError.ws_session_send_req_error, e);
}
}
public <T> void sendWsResp(String uid, WsResp<T> wsResp) throws MicroRuntimeException {
Session session = getSession(uid);
sendWsResp(session, wsResp);
}
public <T> void sendWsResp(Session session, WsResp<T> wsResp) throws MicroRuntimeException {
if (null == wsResp) {
throw new MicroRuntimeException(WsError.ws_invalid_msg);
}
WsUtil.checkSession(session);
try {
String text = JSON.toJSONString(wsResp);
WsUtil.sendText(session, text);
} catch (Exception e) {
log.error("S-B 响应消息异常:{},{},{}", WsUtil.getLoginUid(session), session, wsResp, e);
throw new MicroRuntimeException(WsError.ws_session_send_resp_error, e);
}
}
/**
* 根据用户UID获取本地WS会话对象
*
* @param uid
* @return
*/
private Session getSession(String uid) {
WsSession wsSession = wsSessionService.getLocal().getSession(uid);
if (wsSession instanceof LocalWsSession) {
LocalWsSession localWsSession = (LocalWsSession) wsSession;
return localWsSession.getSession();
}
return null;
}
}集群会话service 集群会话的获取和发送消息
集群发送消息通过mq发送到特定的服务里,由特定服务响应
java
import com.alibaba.fastjson.JSONObject;
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.core.mq.AmqpHelper;
import com.commnetsoft.core.utils.ApplicationUtil;
import com.commnetsoft.exception.MicroRuntimeException;
import com.commnetsoft.ws.WsError;
import com.commnetsoft.ws.WsSessionEvents;
import com.commnetsoft.ws.model.*;
import com.commnetsoft.ws.service.WsSessionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 集群WS通讯
*
* @author Brack.zhu
* @date 2020/12/9
*/
@Component
public class ClusterWsCommunicate implements IWsCommunicate {
@Autowired
private WsSessionService wsSessionService;
@Autowired
private AmqpHelper amqpHelper;
private Logger log = LoggerFactory.getLogger(ClusterWsCommunicate.class);
@Override
public <T> void sendWsReq(String uid, WsReq<T> wsReq) throws MicroRuntimeException {
if (null == wsReq) {
throw new MicroRuntimeException(WsError.ws_invalid_msg);
}
String serviceInstanceId = getServerInstanceId(uid);
if (StringUtils.isBlank(serviceInstanceId)) {
throw new MicroRuntimeException(WsError.ws_session_notfound);
}
try {
WsClusterMsg wsClusterReqMsg = new WsClusterMsg();
wsClusterReqMsg.setUid(uid);
wsClusterReqMsg.setMsg(wsReq);
wsClusterReqMsg.setFrom(ApplicationUtil.getInstanceId());
if (null != wsReq.getData()) {
wsClusterReqMsg.setDataClassType(wsReq.getData().getClass());
}
amqpHelper.sendToExchange(buildMqRoutingKey(serviceInstanceId), wsClusterReqMsg);
} catch (Exception e) {
log.error("S-S 请求消息异常:{},{},{}", uid, serviceInstanceId, wsReq, e);
throw new MicroRuntimeException(WsError.ws_session_send_req_error, "集群发送请求消息失败", e);
}
}
/**
* 响应消息集群环境中同步 <br/>
* AS--->BS--req-->C--resp->BS--sync-->AS
* @param uid
* @param respJson
* @param serviceInstanceId
* @param <T>
* @throws MicroRuntimeException
*/
public <T> void wsRespClusterSync(String uid,JSONObject respJson,String serviceInstanceId) throws MicroRuntimeException {
if (null == respJson) {
throw new MicroRuntimeException(WsError.ws_invalid_msg);
}
try {
WsClusterMsg wsClusterRespMsg = new WsClusterMsg();
wsClusterRespMsg.setUid(uid);
wsClusterRespMsg.setMsg(respJson);
wsClusterRespMsg.setFrom(ApplicationUtil.getInstanceId());
amqpHelper.sendToExchange(buildMqRoutingKey(serviceInstanceId), wsClusterRespMsg);
} catch (Exception e) {
log.error("S-S 响应消息异常:{},{},{}", uid, serviceInstanceId, respJson, e);
throw new MicroRuntimeException(WsError.ws_session_send_req_error, "集群发送响应消息失败", e);
}
}
/**
* 构建TOPIC的 RoutingKey
*
* @param serviceInstanceId
* @return
*/
public static String buildMqRoutingKey(String serviceInstanceId) {
return WsSessionEvents.WS_SESSION_SEND_CLUSTER_ + serviceInstanceId;
}
/**
* 根据用户UID获取会话所在的服务实例
* @param uid
* @return
*/
private String getServerInstanceId(String uid){
WsSession wsSession=wsSessionService.getCluster().getSession(uid);
if(wsSession instanceof ClusterWsSession){
ClusterWsSession clusterWsSession=(ClusterWsSession)wsSession;
return clusterWsSession.getServerInstanceId();
}
return null;
}
}集群获取mq中需响应的信息
java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.commnetsoft.ws.AbstractWsService;
import com.commnetsoft.ws.WsConfig;
import com.commnetsoft.ws.annotation.WsType;
import com.commnetsoft.ws.model.*;
import com.commnetsoft.ws.service.WsReceiveService;
import com.commnetsoft.ws.service.WsSessionService;
import com.commnetsoft.ws.service.comm.WsDispatch;
import com.commnetsoft.ws.util.WsUtil;
import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Brack.zhu
* @date 2020/12/22
*/
@Component
public class ClusterWsMqListener {
@Autowired
private WsDispatch wsDispatch;
@Autowired
private WsReceiveService wsReceiveService;
@Autowired
private WsSessionService wsSessionService;
@Autowired
private WsConfig wsConfig;
/**
* 被动超时MAP---没有线程主动清除超时对象,只有调用相关方法时被动超时清除-
* 没有线程同步
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* key:msgid
* val:ServiceInstanceId 服务实例id
*/
private PassiveExpiringMap<String, String> reqRespClusterMaps;
private Logger log = LoggerFactory.getLogger(ClusterWsMqListener.class);
@PostConstruct
public void postConstruct() {
long wsSyncSendTimeout = wsConfig.getWsSyncSendTimeout();
lock.lock();
try {
long timeToLiveMillis = wsSyncSendTimeout + 30000;
log.info("WS集群MQ消息发送响应关联集合有效期时间调整为:{}",timeToLiveMillis);
PassiveExpiringMap<String, String> reqRespClusterMapsTemp = new PassiveExpiringMap<>(timeToLiveMillis);
if (null != reqRespClusterMaps) {
reqRespClusterMapsTemp.putAll(reqRespClusterMaps);
log.info("WS集群MQ消息发送响应关联集合迁移数:{}",reqRespClusterMaps.size());
}
reqRespClusterMaps = reqRespClusterMapsTemp;
} catch (Exception e) {
log.error("", e);
} finally {
lock.unlock();
}
}
public String getReqRespClusterMap(String msgid) {
return reqRespClusterMaps.get(msgid);
}
public void putReqRespClusterMap(String msgid, String uid) {
lock.lock();
try {
reqRespClusterMaps.put(msgid, uid);
} catch (Exception e) {
log.error("", e);
} finally {
lock.unlock();
}
}
public String removeReqRespClusterMap(String msgid) {
lock.lock();
try {
return reqRespClusterMaps.remove(msgid);
} catch (Exception e) {
log.error("", e);
} finally {
lock.unlock();
}
return null;
}
/**
* WS集群消息分发监听器--接收来自其他节点的消息
*
* @param msg MQ集群消息对象
*/
@RabbitListener(queues = "#{wsConfig.wsClusterQueueName()}")
public void wsClusterDispatchListener(WsClusterMsg msg) {
try {
log.info("接收到集群WS消息:{},{}", JSON.toJSONString(msg));
String uid = msg.getUid();
Object msgObj = msg.getMsg();
if (msgObj instanceof JSONObject) {
JSONObject jsonObject = (JSONObject) msgObj;
String type = jsonObject.getString(AbstractWsService.WS_TYPE_KEY);
if (WsType.req.name().equals(type)) {
Class<?> dataClassType = msg.getDataClassType();
if (null == dataClassType) {
dataClassType = Void.class;
}
WsReq wsReq = WsUtil.toWsReq(jsonObject, dataClassType);
putReqRespClusterMap(wsReq.getId(),msg.getFrom());
wsDispatch.getLocal().sendWsReq(uid, wsReq);
} else if (WsType.resp.name().equals(type)) {
//消息
WsSession wsSession=wsSessionService.getCluster().getSession(uid);
wsReceiveService.onRespMessage(jsonObject,wsSession);
} else {
log.error("集群WS消息暂不支持的type类型:{}", type);
}
} else {
log.error("集群WS消息暂不支持的msg数据类型:{}", msgObj);
}
} catch (Exception e) {
log.error("WS集群消息分发监听处理异常,{}", msg, e);
}
}
/**
* 参数变更事件监听
*
* @param envChangeEvent
*/
@EventListener
public void EnvironmentChangeEventListener(EnvironmentChangeEvent envChangeEvent) {
if (envChangeEvent.getKeys().contains("ws.sync.send.timeout")) {
postConstruct();
}
}
}接收响应数据service
java
import com.alibaba.fastjson.JSONObject;
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.ws.AbstractWsService;
import com.commnetsoft.ws.boot.RespWsActionOp;
import com.commnetsoft.ws.boot.WsActionBeanPostProcessor;
import com.commnetsoft.ws.model.WsResp;
import com.commnetsoft.ws.model.WsSession;
import com.commnetsoft.ws.service.comm.WsDispatch;
import com.commnetsoft.ws.service.mq.ClusterWsMqListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* WS消息接收处理服务
* @author Brack.zhu
* @date 2020/12/23
*/
@Component
public class WsReceiveService {
@Autowired
private WsDispatch wsDispatch;
@Autowired
private WsActionBeanPostProcessor wsActionBeanPostProcessor;
@Autowired
private WsSendHelper wsSendHelper;
@Autowired
private ClusterWsMqListener clusterWsMqListener;
private Logger log = LoggerFactory.getLogger(WsReceiveService.class);
/**
* 响应请求
* 根据id唤醒同步请求并返回结果
*
* @param resp
* @param <V>
*/
public <V> void respCountDownLatch(WsResp<V> resp) {
wsDispatch.respCountDownLatch(resp);
}
/**
* 接收响应数据
*
* @param msgJson
* @param wsSession
*/
public void onRespMessage(JSONObject msgJson, WsSession wsSession) throws Exception {
String id = msgJson.getString(AbstractWsService.WS_ID_KEY);
String clusterServiceInstanceId = clusterWsMqListener.getReqRespClusterMap(id);
if (StringUtils.isNotBlank(clusterServiceInstanceId)) {
//是否来自本服务会话响应集群消息 AS--req->BS--->C--resp->BS--sync->AS
//转发--集群处理响应消息
String uid = wsSession.getUid();
wsDispatch.getCluster().wsRespClusterSync(uid, msgJson,clusterServiceInstanceId);
} else {
//本地处理响应消息--可能来自本地或者集群
String action = msgJson.getString(AbstractWsService.WS_ACTION_KEY);
//获取入参类型
RespWsActionOp respWsActionOp = wsActionBeanPostProcessor.resp();
Class<?> dataVoClazz = respWsActionOp.getDataVoClazzNull(action);
if (null == dataVoClazz) {
dataVoClazz = wsSendHelper.getReqCdlRespDataType(id);
if (null == dataVoClazz) {
dataVoClazz = Void.class;
}
}
//回调对应实现方法
WsResp<?> wsResp = AbstractWsService.toWsResp(msgJson, dataVoClazz);
onRespMessage(wsResp, wsSession);
}
}
/**
* 接收响应数据
*
* @param wsResp
* @param wsSession
*/
public void onRespMessage(WsResp<?> wsResp, WsSession wsSession) throws Exception {
String action = wsResp.getAction();
//获取入参类型
RespWsActionOp respWsActionOp = wsActionBeanPostProcessor.resp();
if (!respWsActionOp.isPub(action) && !wsSession.isAuthed()) {
//非公开方法且未登录
log.warn("响应消息{},于处理要求不一致(该响应需要登录),{}", wsResp, wsSession);
} else {
respWsActionOp.invokeWsAction(wsResp, wsSession);
}
}
}WS工具类
java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.exception.MicroRuntimeException;
import com.commnetsoft.ws.AbstractWsService;
import com.commnetsoft.ws.WsError;
import com.commnetsoft.ws.annotation.WsData;
import com.commnetsoft.ws.annotation.WsType;
import com.commnetsoft.ws.model.WsReq;
import com.commnetsoft.ws.model.WsResp;
import org.apache.tomcat.websocket.WsSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.AnnotationUtils;
import javax.websocket.SendHandler;
import javax.websocket.Session;
/**
* WebSocket 工具类
*
* @author Brack.zhu
* @date 2019/12/3
*/
public class WsUtil {
private static Logger log = LoggerFactory.getLogger(WsUtil.class);
/**
* 会话属性-用户uid Key
*/
public static final String KEY_UID = "wsUid";
/**
* 会话属性-请求IP
*/
public static final String KEY_HTTP_IP = "httpIp";
/**
* 设置会话属性对象
*
* @param session
* @param key
* @param obj
*/
public static void setUserProperties(Session session, String key, Object obj) {
session.getUserProperties().put(key, obj);
}
/**
* 设置会话登录用户id属性
*
* @param session
* @return
*/
public static void setLoginUid(Session session, String uid) {
setUserProperties(session, KEY_UID, uid);
}
/**
* 获取会话属性对象
*
* @param session
* @param key
* @return
*/
public static Object getUserProperties(Session session, String key) {
return session.getUserProperties().get(key);
}
/**
* 获取会话登录用户id属性
*
* @param session
* @return
*/
public static String getLoginUid(Session session) {
Object uidObj = getUserProperties(session, KEY_UID);
if (null == uidObj) {
return null;
}
return String.valueOf(uidObj);
}
/**
* 判断会话是否登录
*
* @param session
* @return
*/
public static boolean isLogin(Session session) {
String uid = getLoginUid(session);
return StringUtils.isNotBlank(uid) ? true : false;
}
/**
* 获取指定wsDataCalzz 类的WsData Action值,无返回null
*
* @param wsDataCalzz
* @param <T>
* @return
*/
public static <T> String getWsDataAction(Class<T> wsDataCalzz) {
WsData wsData = AnnotationUtils.findAnnotation(wsDataCalzz, WsData.class);
if (null != wsData) {
return wsData.action();
}
return null;
}
/**
* 清除会话自定义属性
*
* @param session
*/
public static void clearUserProperties(Session session) {
if (null != session) {
session.getUserProperties().clear();
}
}
/**
* 关闭Ws会话,忽略会话关闭异常
*
* @param session
*/
public static void closeSession(Session session) {
if (null != session) {
try {
//自定义关闭---测试不推荐
// CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, null);
// if (session instanceof WsSession) {
// //WsSession 默认关闭方法 中 是不强制关闭 closeSocket 默认值false,需传入强制关闭
// WsSession wsSession = (WsSession) session;
// wsSession.onClose(closeReason);
// log.warn("强制关闭WsSession完成:{}", WsUtil.toStr(session));
// } else {
// log.warn("强制关闭session类型时,不是WsSession类型调用父类关闭方法:{}", WsUtil.toStr(session));
// session.close(closeReason);
// log.warn("强制关闭session完成:{}", WsUtil.toStr(session));
// }
session.close();
log.warn("强制关闭session完成:{}", WsUtil.toStr(session));
} catch (Exception e) {
log.error("closeSession Exception,{}", WsUtil.toStr(session), e);
}
}
}
/**
* 获取会话信息
*
* @param session
* @return
*/
public static String toStr(Session session) {
if (null != session) {
//base
StringBuilder sb = new StringBuilder();
sb.append("session[id:").append(session.getId())
.append(",hashCode:").append(Integer.toHexString(session.hashCode()))
;
if (session instanceof WsSession) {
WsSession wsSession = (WsSession) session;
sb.append(",httpSessionId:").append(wsSession.getHttpSessionId())
// .append(",httpIp:").append(getUserProperties(session, KEY_HTTP_IP));
;
}
sb.append("]");
return sb.toString();
}
return null;
}
/**
* 会话是否开启的
*
* @param session
* @return
*/
public static boolean isOpened(Session session) {
if (null != session && session.isOpen()) {
return true;
}
return false;
}
/**
* 检查WS会话对象
*
* @param session
* @throws MicroRuntimeException
*/
public static void checkSession(Session session) throws MicroRuntimeException {
if (null == session) {
throw new MicroRuntimeException(WsError.ws_session_notfound);
}
if (!WsUtil.isOpened(session)) {
throw new MicroRuntimeException(WsError.ws_session_invalid, "session信息:" + WsUtil.toStr(session));
}
}
/**
* WS发送文本
* @param session
* @param text
* @throws MicroRuntimeException
*/
public static void sendText(Session session,String text)throws MicroRuntimeException{
checkSession(session);
try {
session.getBasicRemote().sendText(text);
//成功
log.info("S-B:[{}]-{},{}", session.getId(), text, session);
} catch (Exception e) {
log.error("S-B 发送消息异常:{},{}", session, text, e);
throw new MicroRuntimeException(WsError.ws_session_send_error, e);
}
}
/**
* 异步发送WS消息
*
* @param message 消息内容
* @param session 会话
* @param completion 结果异步通知对象
*/
public static void asyncSendWsText(String message, Session session, SendHandler completion) throws MicroRuntimeException{
checkSession(session);
session.getAsyncRemote().sendText(message, completion);
}
/**
* 响应消息转换成WsResp对象
*
* @param msgJson
* @param dataClazz 响应data转换成对象Class
* @param <T>
* @return
*/
public static <T> WsResp<T> toWsResp(JSONObject msgJson, Class<T> dataClazz) throws MicroRuntimeException{
try {
String type = msgJson.getString(AbstractWsService.WS_TYPE_KEY);
if (WsType.resp.toString().equals(type)) {
return JSON.parseObject(msgJson.toJSONString(), new TypeReference<WsResp<T>>(dataClazz) {
});
} else {
throw new MicroRuntimeException(WsError.ws_invalid_type);
}
} catch (JSONException je) {
throw new MicroRuntimeException(WsError.ws_invalid_msg);
}
}
/**
* 请求消息转换成WsReq对象
*
* @param msgJson
* @param dataClazz 请求data转换成对象Class
* @param <T>
* @return
*/
public static <T> WsReq<T> toWsReq(JSONObject msgJson, Class<T> dataClazz) {
try {
String type = msgJson.getString(AbstractWsService.WS_TYPE_KEY);
if (WsType.req.toString().equals(type)) {
return JSON.parseObject(msgJson.toJSONString(), new TypeReference<WsReq<T>>(dataClazz) {
});
} else {
throw new MicroRuntimeException(WsError.ws_invalid_type);
}
} catch (JSONException je) {
throw new MicroRuntimeException(WsError.ws_invalid_msg);
}
}
}WS发送消息工具类
java
import com.alibaba.fastjson.JSON;
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.commons.utils.UUIDUtils;
import com.commnetsoft.core.CommonError;
import com.commnetsoft.exception.MicroRuntimeException;
import com.commnetsoft.ws.WsError;
import com.commnetsoft.ws.annotation.WsType;
import com.commnetsoft.ws.boot.WsSendHandler;
import com.commnetsoft.ws.model.*;
import com.commnetsoft.ws.service.comm.IWsCommunicate;
import com.commnetsoft.ws.service.comm.LocalWsCommunicate;
import com.commnetsoft.ws.service.comm.WsDispatch;
import com.commnetsoft.ws.util.WsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.Session;
/**
* WS消息发送服务
*
* @author Brack.zhu
* @date 2020/3/24
*/
@Component
public class WsSendHelper {
private static Logger log = LoggerFactory.getLogger(WsSendHelper.class);
@Autowired
private WsDispatch wsDispatch;
/**
* 发送WS请求消息-支持集群<br/>
*
* @param uid 目标用户UID
* @param data 请求消息Data对象
* @throws MicroRuntimeException 发送WS消息异常
*/
public <T> void sendWsReq(String uid, T data) throws MicroRuntimeException {
if (StringUtils.isNotEmpty(uid) && null != data) {
String wsAction = WsUtil.getWsDataAction(data.getClass());
if (StringUtils.isEmpty(wsAction)) {
throw new MicroRuntimeException(WsError.ws_data_action_error);
}
WsReq<T> wsReq = new WsReq<>();
wsReq.setId(UUIDUtils.generate());
wsReq.setType(WsType.req);
wsReq.setAction(wsAction);
wsReq.setData(data);
sendWsReq(uid, wsReq);
}
}
/**
* 发送WS请求消息-支持集群
*/
public <T> void sendWsReq(String uid, WsReq<T> wsReq) throws MicroRuntimeException {
if (StringUtils.isNotEmpty(uid) && null != wsReq) {
wsDispatch.sendWsReq(uid, wsReq);
}
}
/**
* 同步发送请求消息;同步等待返回消息-支持集群
* 如果发送失败重试,重试间隔时间:3秒
* {@link #syncSendWsReq(String, WsReq, Class, int, long)}
*
* @param uid 消息接收者uid
* @param wsReq 发送请求消息
* @param respDataType 响应数据类型
* @param retryCount 失败重试次数
* @param <R> 请求消息数据类型
* @param <P> 响应消息数据类型
* @return
* @throws MicroRuntimeException
*/
public <R, P> WsResp<P> syncSendWsReq(String uid, WsReq<R> wsReq, Class<P> respDataType, int retryCount) throws MicroRuntimeException {
return syncSendWsReq(uid, wsReq, respDataType, retryCount, 3000L);
}
/**
* 同步发送请求消息;同步等待返回消息-支持集群
* 如果发送失败重试将重试。
* 发送失败定义:目标对象不存在(未登录)
*
* @param uid 消息接收者uid
* @param wsReq 发送请求消息
* @param respDataType 响应数据类型
* @param retryCount 失败重试次数
* @param retrySleepTime 失败重试间隔时间,单位毫秒
* @param <R> 请求消息数据类型
* @param <P> 响应消息数据类型
* @return
* @throws MicroRuntimeException
*/
public <R, P> WsResp<P> syncSendWsReq(String uid, WsReq<R> wsReq, Class<P> respDataType, int retryCount, long retrySleepTime) throws MicroRuntimeException {
if (StringUtils.isEmpty(uid) || null == wsReq) {
throw new MicroRuntimeException(CommonError.illegal_args);
}
int i = 0;
boolean retry = false;
do {
try {
WsResp<P> resp = wsDispatch.sendAndReceiveWsReq(uid, wsReq, respDataType);
return resp;
} catch (MicroRuntimeException e) {
if (WsError.ws_session_notfound.equals(e)) {
i++;
retry = true;
try {
Thread.sleep(retrySleepTime);
} catch (InterruptedException ex) {
//ignore
}
} else {
//向上抛异常
throw e;
}
}
} while (retry && i < retryCount);
throw new MicroRuntimeException(WsError.ws_error, "发送(重试)失败");
}
/**
* 同步发送请求消息;同步等待返回消息-支持集群
*
* @param uid 消息接收者uid
* @param wsReq 发送请求消息
* @param respDataType 响应数据类型
* @param <R> 请求消息数据类型
* @param <P> 响应消息数据类型
* @return
* @throws MicroRuntimeException
*/
public <R, P> WsResp<P> syncSendWsReq(String uid, WsReq<R> wsReq, Class<P> respDataType) throws MicroRuntimeException {
if (StringUtils.isEmpty(uid) || null == wsReq) {
throw new MicroRuntimeException(CommonError.illegal_args);
}
return wsDispatch.sendAndReceiveWsReq(uid, wsReq, respDataType);
}
/**
* 发送WS响应消息<br/>
* 仅支持本地发送
* 不会对发送结果进行逻辑处理,即不会产生熔断逻辑触发。
*/
public <V> void sendWsResp(String uid, WsResp<V> wsResp) throws MicroRuntimeException {
if (StringUtils.isNotEmpty(uid) && null != wsResp) {
wsDispatch.sendWsResp(uid, wsResp);
}
}
/**
* 发送WS响应消息<br/>
* 仅支持指定Session(本地发送)
* 不会对发送结果进行逻辑处理,即不会产生熔断逻辑触发。
*/
public <V> void sendWsResp(Session session, WsResp<V> wsResp) {
IWsCommunicate localWsComm = wsDispatch.getLocal();
if (localWsComm instanceof LocalWsCommunicate) {
LocalWsCommunicate localWsCommunicate = (LocalWsCommunicate) localWsComm;
localWsCommunicate.sendWsResp(session, wsResp);
} else {
log.warn("未知本地通讯实现,{}", localWsComm);
}
}
/**
* 发送WS响应消息<br/>
* 仅支持指定Session(本地发送)
* 会对发送结果进行逻辑处理,会产生熔断逻辑触发。
*/
public <T, V> void sendWsResp(Session session, WsReq<T> wsReq, WsResp<V> wsResp) {
if (null != wsResp) {
WsUtil.asyncSendWsText(JSON.toJSONString(wsResp), session, new WsSendHandler(wsReq, wsResp, session));
}
}
/**
* 根据响应ID获取对应的同步响应数据类型
*
* @param id
* @return
*/
public Class<?> getReqCdlRespDataType(String id) {
ReqRespCountDownLatch cdl = wsDispatch.getReqRespCdl(id);
if (null != cdl) {
return cdl.getRespDataType();
}
return null;
}
}登录,心跳检查
接口
java
import com.commnetsoft.commons.Result;
import com.commnetsoft.ws.model.WsSession;
/**
* 基础服务,登录,心跳检查 默认实现
* @author Brack.zhu
* @date 2019/12/10
*/
public interface IWsBaseService {
/**
* 登录请求处理
* @param wsLoginVo
* @param wsSession
* @return
*/
Result<Void> wsLoginReq(WsLoginVo wsLoginVo, WsSession wsSession);
/**
* 心跳检查请求处理
* @param wsHeartbeatVo
* @param wsSession
* @return
*/
Result<Void> wsHeartbeatReq(WsHeartbeatVo wsHeartbeatVo, WsSession wsSession);
/**
* 心跳检查响应处理
* @param wsHeartbeatVo
* @param wsSession
* @return
*/
Result<Void> wsHeartbeatResp(WsHeartbeatVo wsHeartbeatVo, WsSession wsSession);
}实现service
java
import com.alibaba.fastjson.JSONObject;
import com.commnetsoft.auth.api.v2.model.ValidDto;
import com.commnetsoft.auth.model.HttpRouteHead;
import com.commnetsoft.commons.Result;
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.ws.AbstractWsService;
import com.commnetsoft.ws.WsError;
import com.commnetsoft.ws.annotation.WsAction;
import com.commnetsoft.ws.annotation.WsPermit;
import com.commnetsoft.ws.annotation.WsType;
import com.commnetsoft.ws.feign.AuthApi;
import com.commnetsoft.ws.feign.ValidationApi;
import com.commnetsoft.ws.model.LocalWsSession;
import com.commnetsoft.ws.model.WsSession;
import com.commnetsoft.ws.util.WsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.websocket.Session;
import java.util.Map;
/**
* WS基础业务处理服务</br>
* 登录</br>
* 心跳检测</br>
*
* @author Brack.zhu
* @date 2019/12/4
*/
@Service
public class WsBaseService implements IWsBaseService {
@Autowired
private AuthApi authApi;
@Autowired
private ValidationApi validationApi;
private Logger log = LoggerFactory.getLogger(WsBaseService.class);
@WsAction(action = AbstractWsService.WS_LOGIN_ACTION_NAME, desc = "WS登录验证请求处理", type = WsType.req, fallback = WsBaseServiceFallback.class, permit = WsPermit.PUBLIC)
@Override
public Result<Void> wsLoginReq(WsLoginVo wsLoginVo, WsSession wsSession) {
if(wsSession instanceof LocalWsSession){
LocalWsSession localWsSession=(LocalWsSession)wsSession;
String jwt = wsLoginVo.getJwt();
if (StringUtils.isNotBlank(jwt)) {
return wsLoginReqByJwt(jwt, localWsSession.getSession());
}
String username = wsLoginVo.getUsername();
String pwd = wsLoginVo.getPwd();
if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(pwd)) {
return wsLoginReqByUsernamePwd(username, pwd, localWsSession.getSession());
}
return Result.create(WsError.ws_login_type_nonsupport);
}
return Result.create(WsError.ws_session_invalid,"非本地会话对象");
}
/**
* 根据JWT方式登录
*
* @param jwt
* @param session
* @return
*/
public Result<Void> wsLoginReqByJwt(String jwt, Session session) {
Result<Map<String, String>> result = authApi.jwtParse(jwt);
if (result.successful()) {
Object uidObj = result.getResult().get(HttpRouteHead.idmuid.getValue());
String uid = String.valueOf(uidObj);
WsUtil.setLoginUid(session, uid);
return Result.create();
} else {
return Result.create(result);
}
}
/**
* 根据用户名密码方式登录
*
* @param username
* @param pwd
* @param session
* @return
*/
public Result<Void> wsLoginReqByUsernamePwd(String username, String pwd, Session session) {
JSONObject login=new JSONObject();
login.put("username", username);
login.put("pwd",pwd);
Result<ValidDto> result = validationApi.valid(login);
if (result.successful()) {
Object uidObj = result.getResult().getUid();
String uid = String.valueOf(uidObj);
WsUtil.setLoginUid(session, uid);
return Result.create();
} else {
return Result.create(result);
}
}
@WsAction(action = AbstractWsService.WS_HEARTBEAT_ACTION_NAME, desc = "WS心跳检测请求", type = WsType.req, fallback = WsBaseServiceFallback.class)
@Override
public Result<Void> wsHeartbeatReq(WsHeartbeatVo wsHeartbeatVo, WsSession wsSession) {
if (!wsSession.isAuthed()) {
log.warn("收到心跳消息,但是该会话已经不可用{}", wsSession);
}
if(wsSession instanceof LocalWsSession){
LocalWsSession localWsSession=(LocalWsSession)wsSession;
if (!localWsSession.getSession().isOpen()) {
log.warn("收到心跳消息,但是会话状态不是OPEN:{}", wsSession);
}
}
return Result.create();
}
@WsAction(action = AbstractWsService.WS_HEARTBEAT_ACTION_NAME, desc = "WS心跳检测响应", type = WsType.resp, fallback = WsBaseServiceFallback.class)
@Override
public Result<Void> wsHeartbeatResp(WsHeartbeatVo wsHeartbeatVo, WsSession wsSession) {
if (!wsSession.isAuthed()) {
log.warn("响应心跳消息,但是该会话已经不可用{}", wsSession);
}
if(wsSession instanceof LocalWsSession){
LocalWsSession localWsSession=(LocalWsSession)wsSession;
if (!localWsSession.getSession().isOpen()) {
log.warn("响应心跳消息,但是会话状态不是OPEN:{}", wsSession);
}
}
return Result.create();
}
}service熔断类
java
import com.commnetsoft.commons.Result;
import com.commnetsoft.commons.utils.StringUtils;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.commnetsoft.ws.model.WsSession;
import com.commnetsoft.ws.service.WsSessionService;
import org.springframework.stereotype.Component;
/**
* WS基础业务处理服务--熔断方法
* @author Brack.zhu
* @date 2019/12/9
*/
@Component
public class WsBaseServiceFallback implements IWsBaseService {
@Override
public Result<Void> wsLoginReq(WsLoginVo wsLoginVo, WsSession wsSession) {
//熔断错误业务逻辑---将认证成功的会话移除
String uid=wsSession.getUid();
if(StringUtils.isNotBlank(uid)){
WsSessionService wsSessionService= SpringContextUtil.getBean(WsSessionService.class);
if (null!=wsSessionService){
wsSessionService.removeAuthSession(uid);
}
}
//该结果不会返回给浏览器,一般定义标准成功即可
return Result.create();
}
@Override
public Result<Void> wsHeartbeatReq(WsHeartbeatVo wsHeartbeatVo, WsSession wsSession) {
return Result.create();
}
@Override
public Result<Void> wsHeartbeatResp(WsHeartbeatVo wsHeartbeatVo, WsSession wsSession) {
return Result.create();
}
}实体类
java
import com.commnetsoft.ws.AbstractWsService;
import com.commnetsoft.ws.annotation.WsData;
/**
* WS心跳包
* @author Brack.zhu
* @date 2019/12/9
*/
@WsData(action = AbstractWsService.WS_HEARTBEAT_ACTION_NAME)
public class WsHeartbeatVo {
}
import com.commnetsoft.ws.AbstractWsService;
import com.commnetsoft.ws.annotation.WsData;
/**
* Ws登录数据模型
* 登录支持3种方式
* 1:JWT
* 2: 用户名/密码
* 3:票据 --暂时未实现
* @author Brack.zhu
* @date 2019/12/4
*/
@WsData(action = AbstractWsService.WS_LOGIN_ACTION_NAME)
public class WsLoginVo {
/**
* JWT
*/
private String jwt;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String pwd;
public String getJwt() {
return jwt;
}
public void setJwt(String jwt) {
this.jwt = jwt;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPwd() {
return pwd;
}
public void setPwd(String pwd) {
this.pwd = pwd;
}
}客户端使用
实体类/枚举
import java.lang.annotation.*;
import cn.edcall.module.device.ws.WsType;
/**
* WebSocket 处理方法注解
*
* 业务实现样例,参照登录处理
*
* @author Brack.zhu
* @date 2019/12/3
*/
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface WsAction {
/**
* 处理方法 为空取方法名(长度小于20)
*/
String action();
/**
* 接口方法描述
*/
String desc();
/**
* WS操作类型
* @return
*/
WsType type();
// /**
// * 权限 默认登录
// */
// WsPermit permit() default WsPermit.LOGIN;
// /**
// * 消息失败处理类---熔断
// * @return
// */
// Class<?> fallback() default void.class;
}该注解标识有接收ws请求的类
java
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* WS消息处理器注解
* @author Brack.zhu
* @date 2020年11月19日
*/
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface WsHandler {
}java
/**
* IOT服务能力触发WS消息模型
* @author Brack.zhu
* @date 2020年11月20日
*/
public class WsIotServiceCallReqVo {
/**
* iot 注册时返回的 ID
*/
private String iotid;
/**
* 约定服务能力
*/
private String service;
public String getIotid() {
return iotid;
}
public void setIotid(String iotid) {
this.iotid = iotid;
}
public String getService() {
return service;
}
public void setService(String service) {
this.service = service;
}
}java
public class WsLoginReqVo {
// private String jwt;
//
// public String getJwt() {
// return jwt;
// }
//
// public void setJwt(String jwt) {
// this.jwt = jwt;
// }
private String username;
private String pwd;
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPwd() {
return pwd;
}
public void setPwd(String pwd) {
this.pwd = pwd;
}
}java
import cn.edcall.module.device.ws.WsType;
/**
* 请求消息模型
* @author Brack.zhu
* @date 2019/12/4
*/
public class WsReq<T> {
private String id;
private WsType type = WsType.req;
private String action;
private T data;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
public WsType getType() {
return type;
}
public void setType(WsType type) {
this.type = type;
}
}java
import java.util.Objects;
import cn.edcall.module.device.exception.IErrorCode;
import cn.edcall.module.device.ws.Result;
import cn.edcall.module.device.ws.WsType;
/**
* 响应消息模型
* @author Brack.zhu
* @date 2019/12/4
*/
public class WsResp<T> extends Result<T> {
private String id;
private WsType type= WsType.resp;
private String action;
private Object reqdata;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public Object getReqdata() {
return reqdata;
}
public void setReqdata(Object reqdata) {
this.reqdata = reqdata;
}
public WsType getType() {
return type;
}
public void setType(WsType type) {
this.type = type;
}
/**
* 创建一个正确的数据返回结果
*
* @param result 数据
*/
public static <T,V> WsResp<T> create(WsReq<V> req,Result<T> result) {
WsResp<T> rt = new WsResp<>();
rt.setId(req.getId());
rt.setAction(req.getAction());
rt.setCode(result.getCode());
rt.setMessage(result.getMessage());
rt.setDesc(result.getDesc());
rt.setResult(result.getResult());
return rt;
}
/**
* 创建一个错误的返回结果
*
* @param code 错误信息
*/
public static <T, V> WsResp<T> create(WsReq<V> req, IErrorCode code) {
Objects.requireNonNull(code, "错误的返回结果错误码不能为空");
WsResp<T> rt = new WsResp<>();
rt.setCode(code.getCode());
rt.setMessage(code.getMessage());
rt.setDesc(code.getDesc());
rt.setId(req.getId());
rt.setAction(req.getAction());
rt.setReqdata(req.getData());
return rt;
}
}java
/**
* 请求类型
* @author Brack.zhu
* @date 2019/12/10
*/
public enum WsType {
//请求类型
req,
//响应类型
resp,
}java
import java.lang.reflect.Method;
import cn.edcall.module.device.ws.annotation.WsAction;
/**
* WsActionOp相应操作对象
* @author Brack.zhu
* @date 2019/12/10
*/
public class WsActionOpData {
WsAction wsAction;
Object bean;
Method method;
public WsAction getWsAction() {
return wsAction;
}
public void setWsAction(WsAction wsAction) {
this.wsAction = wsAction;
}
public Object getBean() {
return bean;
}
public void setBean(Object bean) {
this.bean = bean;
}
public Method getMethod() {
return method;
}
public void setMethod(Method method) {
this.method = method;
}
}请求响应闭锁
java
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import cn.edcall.module.device.ws.model.WsResp;
/**
* 请求响应闭锁
* @author Brack.zhu
* @date 2020年11月19日
*/
public class ReqRespCountDownLatch extends CountDownLatch {
/**
* 同步请求响应data数据类型
*/
private Class<?> respDataType;
private WsResp<?> resp;
public ReqRespCountDownLatch(Class<?> respDataType) {
super(1);
this.respDataType=respDataType;
}
/**
* 同步等待请求响应
* @return
*/
@SuppressWarnings("unchecked")
public <T> WsResp<T> awaitResp() {
try {
super.await(WsClientModule.REQ_RESP_TIME_OUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
//ignore
}
return (WsResp<T>)getResp();
}
public WsResp<?> getResp() {
return resp;
}
public void setResp(WsResp<?> resp) {
this.resp = resp;
}
public Class<?> getDataType() {
return respDataType;
}
}请求处理
java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edcall.module.device.exception.MicroRuntimeException;
import cn.edcall.module.device.ws.Result;
import cn.edcall.module.device.ws.WsError;
/**
* @author Brack.zhu
* @date 2019/12/10
*/
public abstract class AbstractWsActionOp {
private transient Logger log = LoggerFactory.getLogger(AbstractWsActionOp.class);
/**
* 获取请求类型指定WsActionOp对象
*
* @param wsAction
* @return
*/
public abstract WsActionOpData getWsActionOpData(String wsAction);
// /**
// * 获取指定WsAction对应的接口权限
// *
// * @param wsAction
// * @return
// */
// public WsPermit getWsPermit(String wsAction) {
// WsActionOpData wsActionOpData = getWsActionOpData(wsAction);
// if (null != wsActionOpData) {
// return wsActionOpData.getWsAction().permit();
// }
// return null;
// }
// /**
// * 指定WsAction对应的接口权限是否公开
// *
// * @param wsAction
// * @return true 公开
// */
// public boolean isPub(String wsAction) {
// WsPermit apiPermit = getWsPermit(wsAction);
// if (null != apiPermit) {
// return WsPermit.PUBLIC.equals(apiPermit) ? true : false;
// }
// return false;
// }
/**
* 获取反射方法形参类型数组
*
* @param wsAction
* @return
*/
public Class<?>[] getArgsClazz(String wsAction) {
WsActionOpData wsActionOpData = getWsActionOpData(wsAction);
if (null != wsActionOpData) {
return wsActionOpData.getMethod().getParameterTypes();
}
return new Class<?>[] {};
}
/**
* 获取反射方法形参类型数组,获取失败使用默认的Void类型
*
* @param wsAction
* @return
*/
public Class<?> getDataVoClazz(String wsAction) {
Class<?> clazz=getDataVoClazzNull(wsAction);
if(null==clazz) {
return Void.class;
}
return clazz;
}
/**
* 获取反射方法形参类型数组
*
* @param wsAction
* @return
*/
public Class<?> getDataVoClazzNull(String wsAction) {
Class<?>[] argsClazz = getArgsClazz(wsAction);
if (null != argsClazz && argsClazz.length >= 1) {
Class<?> dataVoClazz = argsClazz[0];
return dataVoClazz;
}
return null;
}
/**
* 反射对应方法
*
* @param data
* @param session
* @param action
* @param <T>
* @param <D>
* @return
*/
@SuppressWarnings("unchecked")
public <T, D> Result<T> invokeWsActionMethod(D data, String action) {
Result<T> rs = null;
try {
WsActionOpData wsActionOp = getWsActionOpData(action);
if (null != wsActionOp) {
int parsLength = wsActionOp.getMethod().getParameterTypes().length;
if (1 == parsLength) {
rs = (Result<T>) wsActionOp.getMethod().invoke(wsActionOp.getBean(), data);
} else {
log.error("WsAction 处理方法反射异常,实现类形参定义错误({}),{}", parsLength, data);
return Result.create(WsError.ws_action_error);
}
} else {
log.error("WsAction 处理方法未找到{},{}", data);
return Result.create(WsError.ws_action_notfound);
}
} catch (MicroRuntimeException mre) {
log.error("WsAction 处理方法反射异常,{},{}", data, mre);
return Result.create(mre);
} catch (Exception e) {
log.error("WsAction 处理方法反射异常,{},{}", data, e);
return Result.create(WsError.ws_action_error);
}
return rs;
}
//
// /**
// * 反射对应失败熔断方法
// *
// * @param data
// * @param session
// * @param action
// * @param <D>
// * @return
// */
// public <D> void invokeWsActionFallbackMethod(D data, Session session, String action) {
// try {
// WsActionOpData wsActionOp = getWsActionOpData(action);
// if (null != wsActionOp) {
// Class<?> fallbackClazz = wsActionOp.getWsAction().fallback();
// if (null == fallbackClazz) {
// return;
// }
// Object fallbackBean = SpringContextUtil.getBean(fallbackClazz);
// if (null == fallbackBean) {
// return;
// }
// Method method = wsActionOp.getMethod();
// Class<?>[] paraClass = method.getParameterTypes();
// Method fallbackMethod = fallbackClazz.getMethod(method.getName(), paraClass);
// if (null == fallbackMethod) {
// return;
// }
// Result<?> rs = null;
// int parsLength = paraClass.length;
// if (1 == parsLength) {
// rs = (Result<?>) fallbackMethod.invoke(fallbackBean, session);
// } else if (2 == parsLength) {
// rs = (Result<?>) fallbackMethod.invoke(fallbackBean, data, session);
// } else {
// log.error("WsAction 处理失败熔断方法反射异常,实现类形参定义错误({}),{},{}", parsLength, data, session);
// }
// if (!rs.successful()) {
// log.error("WsAction 处理失败熔断方法反射结果:{},{},{}", rs, data, session);
// }
// } else {
// log.error("WsAction 处理失败熔断方法未找到{},{}", data, session);
// }
// } catch (MicroRuntimeException mre) {
// log.error("WsAction 处理失败熔断方法反射异常,{},{}", data, session, mre);
// } catch (Exception e) {
// log.error("WsAction 处理失败熔断方法反射异常,{},{}", data, session, e);
// }
// }
}java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edcall.module.device.ws.Result;
import cn.edcall.module.device.ws.model.WsReq;
import cn.edcall.module.device.ws.model.WsResp;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 请求类型操作
* @author Brack.zhu
* @date 2019/12/10
*/
public class ReqWsActionOp extends AbstractWsActionOp {
private transient Logger log = LoggerFactory.getLogger(ReqWsActionOp.class);
/**
* WsAction 请求类型相关数据
*/
private final transient Map<String, WsActionOpData> wsReqActions = new ConcurrentHashMap<>(256);
public WsActionOpData put(String wsAction, WsActionOpData wsActionOpData) {
return wsReqActions.put(wsAction, wsActionOpData);
}
public WsActionOpData remove(String wsAction) {
return wsReqActions.remove(wsAction);
}
@Override
public WsActionOpData getWsActionOpData(String wsAction) {
return wsReqActions.get(wsAction);
}
/**
* 调用WsAction对应方法
* @param req
* @param <T>
* @param <V>
* @return
*/
public <T,V> WsResp<T> invokeWsAction(WsReq<V> req){
String action = req.getAction();
V reqData = req.getData();
Result<T> rs=invokeWsActionMethod(reqData,action);
if(!rs.successful()){
log.error("WsAction Req处理方法反射错误{},{}",req,rs);
}
return WsResp.create(req,rs);
}
// /**
// * 调用WsAction对应失败熔断方法
// * @param req
// * @param session
// * @param <V>
// * @return
// */
// public <V> void invokeWsActionFallback(WsReq<V> req, Session session){
// String action = req.getAction();
// V reqData = req.getData();
// invokeWsActionFallbackMethod(reqData,session,action);
// }
}java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.commnetsoft.pub.pro.base.module.ModuleManager;
import cn.edcall.module.device.ws.Result;
import cn.edcall.module.device.ws.WsClientModule;
import cn.edcall.module.device.ws.model.WsResp;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 响应类型操作
* @author Brack.zhu
* @date 2019/12/10
*/
public class RespWsActionOp extends AbstractWsActionOp {
private transient Logger log = LoggerFactory.getLogger(RespWsActionOp.class);
/**
* WsAction 响应类型相关数据
*/
private final transient Map<String, WsActionOpData> wsRespActions = new ConcurrentHashMap<>(256);
public WsActionOpData put(String wsAction, WsActionOpData wsActionOpData) {
return wsRespActions.put(wsAction, wsActionOpData);
}
public WsActionOpData remove(String wsAction) {
return wsRespActions.remove(wsAction);
}
@Override
public WsActionOpData getWsActionOpData(String wsAction) {
WsActionOpData wsActionOpData=wsRespActions.get(wsAction);
return wsActionOpData;
}
/**
* 调用WsAction对应方法
* @param resp
* @param session
* @param <T>
* @param <V>
*/
public <T,V> void invokeWsAction(WsResp<V> resp){
String action = resp.getAction();
// V respResult = resp.getResult();
Result<T> rs=invokeWsActionMethod(resp,action);
if(!rs.successful()){
log.error("WsAction Resp处理方法反射错误{},{}",resp,rs);
}
}
/**
* 通用实现响应
* @param <T>
* @param <V>
* @param resp
* @return
*/
public <T,V> Result<T> generalRespAction(WsResp<V> resp) {
WsClientModule wsClientModule=ModuleManager.getInstance().get(WsClientModule.class);
wsClientModule.respCountDownLatch(resp);
return Result.create();
}
/**
* 重写
*/
@Override
public <T, D> Result<T> invokeWsActionMethod(D data, String action) {
WsActionOpData wsActionOp = getWsActionOpData(action);
if(null==wsActionOp) {
//使用通用实现
return generalRespAction((WsResp<?>)data);
}
Result<T> result=super.invokeWsActionMethod(data, action);
return result;
}
// /**
// * 调用WsAction对应失败熔断方法
// * @param resp
// * @param session
// * @param <V>
// * @return
// */
// public <V> void invokeWsActionFallback(WsResp<V> resp, Session session){
// String action = resp.getAction();
// V respData = resp.getResult();
// invokeWsActionFallbackMethod(respData,session,action);
// }
}WSClient
开启连接、结束、接收到消息对应的处理方法,会话开始、重连、销毁、判断准备是否完成、发送消息
java
import java.net.URI;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.enums.ReadyState;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.commnetsoft.pub.pro.base.module.ModuleManager;
import cn.edcall.module.device.event.EventHelper;
import cn.edcall.module.device.exception.IErrorCode;
import cn.edcall.module.device.exception.MicroRuntimeException;
import cn.edcall.module.device.util.UUIDUtils;
import cn.edcall.module.device.ws.event.WsConnectionEvent;
import cn.edcall.module.device.ws.model.WsReq;
import cn.edcall.module.device.ws.model.WsResp;
import cn.edcall.module.device.ws.op.ReqWsActionOp;
import cn.edcall.module.device.ws.op.RespWsActionOp;
/**
* WS客户端
*
* @author Brack.zhu
* @date 2020年11月16日
*/
public class WsClient extends WebSocketClient {
private Logger log = LoggerFactory.getLogger(getClass());
/**
* WS协议 id key
*/
public final static String WS_ID_KEY = "id";
/**
* WS协议 type key
*/
public final static String WS_TYPE_KEY = "type";
/**
* WS协议 Action key
*/
public final static String WS_ACTION_KEY = "action";
/**
* WS协议 code key
*/
public final static String WS_CODE_KEY = "code";
/**
* 登录操作名
*/
public final static String WS_LOGIN_ACTION_NAME = "login";
/**
* 心跳包操作名
*/
public final static String WS_HEARTBEAT_ACTION_NAME = "heartbeat";
/**
* IOT服务注册操作名
*/
public final static String WS_IOT_REGISTER_ACTION_NAME = "iot_register";
/**
* IOT服务能力触发操作名
*/
public final static String WS_IOT_SERVICE_CALL_ACTION_NAME = "iot_service_call";
/**
* 未知操作名
*/
public final static String WS_UNKNOWN_ACTION_NAME = "unknown";
public WsClient(URI serverUri) {
super(serverUri);
}
@Override
public void onOpen(ServerHandshake handshakedata) {
log.info("WS与服务端开始创建会话 {}!", clinetInfo());
}
@Override
public void onMessage(String message) {
log.info("WS接收消息 ,{}, {}!", message, clinetInfo());
try {
JSONObject msgJson = JSON.parseObject(message);
if (null != msgJson) {
String type = msgJson.getString(WS_TYPE_KEY);
if (WsType.req.toString().equals(type)) {
onReqMessage(msgJson);
} else if (WsType.resp.toString().equals(type)) {
onRespMessage(msgJson);
}
}
} catch (MicroRuntimeException mre) {
log.error("WS onMessage MicroRuntimeException {},{}", message, clinetInfo(), mre);
WsResp<?> wsResp = buildUnknownWsResp(mre);
sendWsResp(wsResp);
} catch (JSONException jsone) {
log.error("WS onMessage JSONException(协议错误) {},{}", message, clinetInfo(), jsone);
MicroRuntimeException microRuntimeException = new MicroRuntimeException(WsError.ws_invalid_msg, jsone);
WsResp<?> wsResp = buildUnknownWsResp(microRuntimeException);
sendWsResp(wsResp);
} catch (Exception e) {
log.error("WS onMessage Exception {},{}", message, clinetInfo(), e);
MicroRuntimeException microRuntimeException = new MicroRuntimeException(WsError.ws_handle_msg_unknown_error, e);
WsResp<?> wsResp = buildUnknownWsResp(microRuntimeException);
sendWsResp(wsResp);
}
}
@Override
public void onClose(int code, String reason, boolean remote) {
log.warn("WS会话关闭code:{},reason:{},remote:{},{}!", code, reason, remote, clinetInfo());
// 重连
WsClientModule wsClientModule = ModuleManager.getInstance().get(WsClientModule.class);
wsClientModule.wsReconnWake();
}
@Override
public void onError(Exception ex) {
log.error("WS会话异常{}", clinetInfo(), ex);
}
/**
* 同步等待连接 最长等待30秒
*/
private void awaitConn() {
for (int i = 0; i < 10; i++) {// 等待30秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore
}
if (clientIsOk()) {
break;
}
}
}
/**
* 会话启动生效
*
* @return
*/
public boolean clientStart() {
try {
this.connect();
if (!clientIsOk()) {
log.info("等待连接成功...");
awaitConn();
if (!clientIsOk()) {
log.warn("连接失败...");
return false;
}
}
// 连接成功事件
EventHelper.publish(new WsConnectionEvent());
log.info("连接成功...");
return true;
} catch (Exception e) {
log.error("会话启动连接异常:", e);
}
return false;
}
/**
* 会话重连
*
* @return
*/
public boolean clientReConnect() {
log.info("WS会话重连,重连前的会话信息 :{}", clinetInfo());
boolean rs = false;
try {
boolean reconnRs = reconnectBlocking();
if (reconnRs) {
awaitConn();
rs = clientIsOk();
if (rs) {
// 重新连接成功事件
EventHelper.publish(new WsConnectionEvent());
}
}
} catch (Exception e) {
log.error("WS会话重连异常:", e);
}
log.info("WS会话重连结果:{}", rs);
return rs;
}
/**
* 会话关闭销毁
*/
public void clientDestroy() throws MicroRuntimeException {
log.info("WS会话销毁,销毁前的会话信息:{}", clinetInfo());
this.close();
}
/**
* 通道是否准备完成
*
* @return
*/
public boolean clientIsOk() {
return getReadyState().equals(ReadyState.OPEN);
}
/**
* 会话信息
*
* @return
*/
public String clinetInfo() {
StringBuffer sb = new StringBuffer();
sb.append("WsInfo{");
sb.append("state:").append(super.getReadyState()).append(";");
sb.append("SSL:").append(super.hasSSLSupport()).append(";");
sb.append("}");
return sb.toString();
}
/**
* 接收请求消息
*
* @param msgJson
* @param session
*/
public void onReqMessage(JSONObject msgJson) throws MicroRuntimeException {
String action = msgJson.getString(WS_ACTION_KEY);
WsClientModule wsClientModule = ModuleManager.getInstance().get(WsClientModule.class);
// 获取入参类型
ReqWsActionOp reqWsActionOp = wsClientModule.getReqWsActionOp();
if (null == reqWsActionOp) {
log.warn("未找到对应请求处理方法:{}", msgJson.toJSONString());
return;
}
Class<?> dataVoClazz = reqWsActionOp.getDataVoClazz(action);
// 回调对应实现方法
WsReq<?> wsReq = toWsReq(msgJson, dataVoClazz);
WsResp<?> wsResp = reqWsActionOp.invokeWsAction(wsReq);
sendWsResp(wsResp);
}
/**
* 接收响应数据
*
* @param msgJson
* @param session
*/
public void onRespMessage(JSONObject msgJson) throws MicroRuntimeException {
String action = msgJson.getString(WS_ACTION_KEY);
// 获取入参类型
WsClientModule wsClientModule = ModuleManager.getInstance().get(WsClientModule.class);
RespWsActionOp respWsActionOp = wsClientModule.getRespWsActionOp();
if (null == respWsActionOp) {
log.warn("未找到对应响应处理方法:{}", msgJson.toJSONString());
return;
}
Class<?> dataVoClazz = respWsActionOp.getDataVoClazzNull(action);
if (null == dataVoClazz) {
String id = msgJson.getString(WS_ID_KEY);
dataVoClazz = wsClientModule.getReqRespCdlRespDataType(id);
if (null == dataVoClazz) {
dataVoClazz = Void.class;
}
}
WsResp<?> wsResp = toWsResp(msgJson, dataVoClazz);
// 回调对应实现方法
respWsActionOp.invokeWsAction(wsResp);
}
/**
* 构建一个未知响应消息,一般在错误协议是使用
*
* @param code
* @param <T>
* @return
*/
public <T> WsResp<T> buildUnknownWsResp(IErrorCode code) {
WsReq<?> wsReq = new WsReq<>();
wsReq.setId(UUIDUtils.generate());
wsReq.setType(WsType.resp);
wsReq.setAction(WS_UNKNOWN_ACTION_NAME);
return WsResp.create(wsReq, code);
}
/**
* 请求消息转换成WsReq对象
*
* @param msgJson
* @param dataClazz
* 请求data转换成对象Class
* @param <T>
* @return
*/
public <T> WsReq<T> toWsReq(JSONObject msgJson, Class<T> dataClazz) {
try {
String type = msgJson.getString(WS_TYPE_KEY);
if (WsType.req.toString().equals(type)) {
// 请求格式验证
if (!msgJson.containsKey(WS_ID_KEY)) {
throw new MicroRuntimeException(WsError.ws_invalid_msg, "请求消息中未包含id字段:" + msgJson.toJSONString());
}
if (!msgJson.containsKey(WS_ACTION_KEY)) {
throw new MicroRuntimeException(WsError.ws_invalid_msg, "请求消息中未包含action字段:" + msgJson.toJSONString());
}
return JSON.parseObject(msgJson.toJSONString(), new TypeReference<WsReq<T>>(dataClazz) {
});
} else {
throw new MicroRuntimeException(WsError.ws_invalid_type);
}
} catch (JSONException je) {
throw new MicroRuntimeException(WsError.ws_invalid_msg);
}
}
/**
* 响应消息转换成WsResp对象
*
* @param msgJson
* @param dataClazz
* 响应data转换成对象Class
* @param <T>
* @return
*/
public <T> WsResp<T> toWsResp(JSONObject msgJson, Class<T> dataClazz) {
try {
String type = msgJson.getString(WS_TYPE_KEY);
if (WsType.resp.toString().equals(type)) {
if (!msgJson.containsKey(WS_ID_KEY)) {
throw new MicroRuntimeException(WsError.ws_invalid_msg, "响应消息中未包含id字段:" + msgJson.toJSONString());
}
if (!msgJson.containsKey(WS_ACTION_KEY)) {
throw new MicroRuntimeException(WsError.ws_invalid_msg, "响应消息中未包含action字段:" + msgJson.toJSONString());
}
if (!msgJson.containsKey(WS_CODE_KEY)) {
throw new MicroRuntimeException(WsError.ws_invalid_msg, "响应消息中未包含code字段:" + msgJson.toJSONString());
}
return JSON.parseObject(msgJson.toJSONString(), new TypeReference<WsResp<T>>(dataClazz) {
});
} else {
throw new MicroRuntimeException(WsError.ws_invalid_type);
}
} catch (JSONException je) {
log.error("响应消息转换JSON异常,json:{},class:{}", msgJson.toJSONString(), dataClazz, je);
throw new MicroRuntimeException(WsError.ws_invalid_msg, je);
}
}
/**
* 发送WS请求消息
*/
public <T> void sendWsReq(WsReq<T> wsReq) throws MicroRuntimeException {
if (null != wsReq) {
if (!clientIsOk()) {
throw new MicroRuntimeException(WsError.ws_connection_unavailable);
}
try {
String text = JSON.toJSONString(wsReq);
this.send(text);
} catch (Exception e) {
throw new MicroRuntimeException(WsError.ws_send_req_error, e);
}
}
}
/**
* 发送WS响应消息<br/>
* 不会对发送结果进行逻辑处理,即不会产生熔断逻辑触发。
*/
public <V> void sendWsResp(WsResp<V> wsResp) {
if (null != wsResp) {
if (!clientIsOk()) {
throw new MicroRuntimeException(WsError.ws_connection_unavailable);
}
try {
String text = JSON.toJSONString(wsResp);
this.send(text);
} catch (Exception e) {
throw new MicroRuntimeException(WsError.ws_send_resp_error, e);
}
}
}
}WSClientModule
包含扫描wsHandle注解,获取ws接受请求ws的连接,将其存入map中,
ws会话连接、登录验证、启动重连和心跳线程、ws状态获取、重连线程唤醒、
发送同步请求,添加请求响应锁
java
import cn.edcall.module.device.DeviceBootstrap;
import cn.edcall.module.device.constant.Constants;
import cn.edcall.module.device.event.EventHelper;
import cn.edcall.module.device.exception.MicroRuntimeException;
import cn.edcall.module.device.util.ConfigUtil;
import cn.edcall.module.device.util.LogExtendUtil;
import cn.edcall.module.device.util.UUIDUtils;
import cn.edcall.module.device.ws.annotation.WsAction;
import cn.edcall.module.device.ws.annotation.WsHandler;
import cn.edcall.module.device.ws.event.WsConnectionInvalidEvent;
import cn.edcall.module.device.ws.event.WsLoginEvent;
import cn.edcall.module.device.ws.model.WsLoginReqVo;
import cn.edcall.module.device.ws.model.WsReq;
import cn.edcall.module.device.ws.model.WsResp;
import cn.edcall.module.device.ws.op.ReqWsActionOp;
import cn.edcall.module.device.ws.op.RespWsActionOp;
import cn.edcall.module.device.ws.op.WsActionOpData;
import com.commnetsoft.pub.pro.base.module.SimpleAbstractModule;
import com.commnetsoft.pub.pro.base.service.ServiceManager;
import com.commnetsoft.pub.util.common.StringUtil;
import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.List;
/**
* WS客户端通讯模块
*
* @author Brack.zhu
* @date 2020-11-13
*/
public class WsClientModule extends SimpleAbstractModule {
private String EDCALL_DOMAIN;
private WsClient wsClient;
private ReqWsActionOp reqWsActionOp = new ReqWsActionOp();
private RespWsActionOp respWsActionOp = new RespWsActionOp();
private WsHeartbeatThread wsHeartbeatThread = new WsHeartbeatThread();
private ClientReconnectThread clientReconnectThread = new ClientReconnectThread();
/**
* 请求响应超时时间---debug时可以设长点
*/
public static long REQ_RESP_TIME_OUT = 30 * 1000;
/**
* 被动超时MAP---没有线程主动清除超时对象,只有调用相关方法时被动超时清除
* 存数key 和请求响应锁的对象
*/
private PassiveExpiringMap<String, ReqRespCountDownLatch> reqRespCDL = new PassiveExpiringMap<String, ReqRespCountDownLatch>(REQ_RESP_TIME_OUT);
private Logger log = LoggerFactory.getLogger(getClass());
@Override
public boolean init0() {
LogExtendUtil.consoleLogInfoPrint(log, "----------WS客户端通讯模块----------");
EDCALL_DOMAIN = ConfigUtil.get(Constants.BASE_DOMAIN);
if (StringUtil.isEmpty(EDCALL_DOMAIN)) {
LogExtendUtil.consoleLogInfoPrint(log, "WS客户端通讯模块初始化失败,原因:获取edcall主机地址失败," + EDCALL_DOMAIN);
return false;
}
// 初始化ws消息处理方法
try {
initWsAction();
} catch (Exception e) {
LogExtendUtil.consoleLogErrorPrint(log, "WS消息处理方法初始化失败,", e);
return false;
}
try {
URI iotURI = new URI(buildIotWebsocketUrl());
LogExtendUtil.consoleLogInfoPrint(log, "WS连接地址:" + iotURI);
wsClient = new WsClient(iotURI);
} catch (Exception e) {
LogExtendUtil.consoleLogErrorPrint(log, "WS客户端通讯模块初始化失败,原因:IOT WS通讯通道客户端创建失败," + buildIotWebsocketUrl(), e);
return false;
}
return true;
}
@Override
public boolean start0() {
if (!wsClient.clientStart()) {
LogExtendUtil.consoleLogInfoPrint(log, "WS客户端通讯模块启动失败,原因:IOT WS通讯通道客户端启动失败!");
return false;
}
try {
// ws登录
clientLogin();
// 启动连接重连线程
clientReconnectThread.start();
// 启动心跳检测
wsHeartbeatThread.start();
} catch (MicroRuntimeException mre) {
LogExtendUtil.consoleLogErrorPrint(log, "会话登录异常:", mre);
return false;
}
LogExtendUtil.consoleLogInfoPrint(log, "----------WS客户端通讯模块----------");
return true;
}
@Override
public boolean stop0() {
try {
wsClient.clientDestroy();
wsHeartbeatThread.interrupt();
} catch (MicroRuntimeException mre) {
LogExtendUtil.consoleLogErrorPrint(log, "会话登出异常:", mre);
// 销毁异常,也强制退出
}
return true;
}
/**
* WS通道是否准备完成
*
* @return
*/
public boolean isOk() {
return wsClient.clientIsOk();
}
/**
* WS通道重连线程唤醒---异步
*
* @return
*/
public void wsReconnWake() {
//连接不可用事件发布
EventHelper.publish(new WsConnectionInvalidEvent());
clientReconnectThread.interrupt();
}
/**
* 构建IOT WS地址
*
* @return
*/
public String buildIotWebsocketUrl() {
return buildIotWebsocketUrl(EDCALL_DOMAIN);
}
/**
* WS会话重连-包含业务
* 产生连接成功事件
* 重新认证
* 产生登录 成功事件
* @return
*/
public boolean wsClientReconn() {
LogExtendUtil.consoleLogInfoPrint(log,"----WS会话重连开始----");
boolean result=false;
if(wsClient.clientReConnect()) {
try {
clientLogin();
result=true;
}catch(Exception e) {
LogExtendUtil.consoleLogWarnPrint(log,"WS会话重连失败,原因:WS登录失败!");
}
}else {
LogExtendUtil.consoleLogWarnPrint(log,"WS会话重连失败,原因:通道重连失败!");
}
LogExtendUtil.consoleLogInfoPrint(log,"-----WS会话重连结束----");
return result;
}
/**
* 构建IOT WS地址
*
* @param host
* 主机域名/主地址
* @return
*/
public String buildIotWebsocketUrl(String host) {
String wsHost = host;
if (host.startsWith("https")) {
wsHost = host.replaceFirst("https", "wss");
} else {
wsHost = host.replaceFirst("http", "ws");
}
return wsHost + "/ws/edcall/iot.ws";
}
/**
* 初始化WS处理方法
*/
public void initWsAction() throws Exception {
List<Class<?>> clazzs = DeviceBootstrap.service.getClassByBasePackage();
if (null == clazzs) {
return;
}
for (Class<?> clazz : clazzs) {
WsHandler wsHandler = clazz.getAnnotation(WsHandler.class);
if (null == wsHandler) {
continue;
}
if(!ServiceManager.getInstance().checkConditionalOnService(clazz)) {
LogExtendUtil.consoleLogInfoPrint(log,clazz+"WsHandler未初始化,因为关联(ConditionalOnService)服务不存在!");
//关联对象未找到
continue;
}
Method[] methods = clazz.getMethods();
if (null == methods) {
continue;
}
Object wsHandlerObj = clazz.newInstance();
for (Method method : methods) {
WsAction wsAction = method.getAnnotation(WsAction.class);
if (null == wsAction) {
continue;
}
int parSize = method.getParameterTypes().length;
if (parSize > 1) {
throw new MicroRuntimeException(WsError.ws_action_pars_error);
}
WsType wsType = wsAction.type();
WsActionOpData wsActionOpData = new WsActionOpData();
wsActionOpData.setBean(wsHandlerObj);
wsActionOpData.setMethod(method);
wsActionOpData.setWsAction(wsAction);
if (WsType.req.equals(wsType)) {
reqWsActionOp.put(wsAction.action(), wsActionOpData);
} else if (WsType.resp.equals(wsType)) {
respWsActionOp.put(wsAction.action(), wsActionOpData);
}
}
}
}
/**
* 同步发送请求消息
*
* @param <R>
* @param <P>
* @param wsReq
* @return 返回结果中为Void模型
* @throws MicroRuntimeException
*/
public <R> WsResp<Void> syncSendWsReq(WsReq<R> wsReq) throws MicroRuntimeException {
if (null == wsReq) {
return null;
}
return syncSendWsReq(wsReq, Void.class);
}
/**
* 同步发送请求消息
*
* @param <R>
* @param <P>
* @param wsReq
* @return
* @throws MicroRuntimeException
*/
public <R, P> WsResp<P> syncSendWsReq(WsReq<R> wsReq,Class<P> respDataType) throws MicroRuntimeException {
if (null == wsReq) {
return null;
}
String id = wsReq.getId();
try {
ReqRespCountDownLatch cdl = new ReqRespCountDownLatch(respDataType);
reqRespCDL.put(id, cdl);
wsClient.sendWsReq(wsReq);
WsResp<P> resp = cdl.awaitResp();
return resp;
} catch (MicroRuntimeException mre) {
reqRespCDL.remove(id);
throw mre;
} catch (Exception e) {
reqRespCDL.remove(id);
throw new MicroRuntimeException(WsError.ws_error, e);
}
}
/**
* 发起请求数据
*
* @param <T>
* @param wsReq
*/
public <T> void send(WsReq<T> wsReq) {
wsClient.sendWsReq(wsReq);
}
/**
* 响应请求 根据id唤醒同步请求并返回结果
*
* @param wsResp
*/
public void respCountDownLatch(WsResp<?> wsResp) {
ReqRespCountDownLatch cdl = reqRespCDL.remove(wsResp.getId());
if (null != cdl) {
cdl.setResp(wsResp);
cdl.countDown();
}
}
/**
* 根据消息id获取响应闭锁中的响应类型
* @param id
* @return
*/
public Class<?> getReqRespCdlRespDataType(String id) {
ReqRespCountDownLatch cdl = reqRespCDL.get(id);
if (null != cdl) {
return cdl.getDataType();
}
return null;
}
/**
* 会话登录
*
* @param username
* @param pwd
* @throws MicroRuntimeException
*/
public void clientLogin() throws MicroRuntimeException {
String username = ConfigUtil.get(Constants.WS_USERNAME);
String pwd = ConfigUtil.get(Constants.WS_PWD);
LogExtendUtil.consoleLogInfoPrint(log, "WS会话开始登录," + username + "," + pwd);
try {
WsReq<WsLoginReqVo> loginReq = new WsReq<>();
loginReq.setId(UUIDUtils.generate());
loginReq.setAction(WsClient.WS_LOGIN_ACTION_NAME);
WsLoginReqVo loginVo = new WsLoginReqVo();
loginVo.setUsername(username);
loginVo.setPwd(pwd);
loginReq.setData(loginVo);
WsResp<Void> loginResp = syncSendWsReq(loginReq);
loginResp.tryData();
//登录成功事件
EventHelper.publish(new WsLoginEvent());
LogExtendUtil.consoleLogInfoPrint(log, "WS会话登录成功:" + clinetInfo());
} catch (MicroRuntimeException e) {
LogExtendUtil.consoleLogErrorPrint(log, "WS会话登录异常:", e);
throw e;
}
}
public String clinetInfo() {
return wsClient.clinetInfo();
}
public ReqWsActionOp getReqWsActionOp() {
return reqWsActionOp;
}
public RespWsActionOp getRespWsActionOp() {
return respWsActionOp;
}
public WsClient getWsClient() {
return wsClient;
}
}重连线程
java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.commnetsoft.pub.pro.base.module.ModuleManager;
/**
* WS客户端会话重连线程
*
* @author Brack.zhu
* @date 2020年11月26日
*/
public class ClientReconnectThread extends Thread {
private Logger log = LoggerFactory.getLogger(getClass());
private boolean stopFlag = false;
private long succeedSleepTime=1000*60*10;
private long failSleepTime=1000*30;
public ClientReconnectThread() {
super("WsClient Reconnect Thread");
}
/**
* 设置停止标签
* 线程逻辑退出
*/
public void setStopFlag() {
this.stopFlag=true;
}
@Override
public void run() {
/log.warn("WS客户端会话重连线程启动!");
long sleepTime=succeedSleepTime;
while (!stopFlag) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
log.warn("WS客户端会话重连线程被唤醒!");
}
if(stopFlag) {
//线程停止
break;
}
WsClientModule wsClientModule=ModuleManager.getInstance().get(WsClientModule.class);
if(!wsClientModule.isOk()) {
if(wsClientModule.wsClientReconn()) {
log.info("WS客户端重连成功,"+wsClientModule.getWsClient().clinetInfo());
sleepTime=succeedSleepTime;
}else {
log.warn("WS客户端重连失败!"+wsClientModule.getWsClient().clinetInfo());
sleepTime=failSleepTime;
}
}
}
log.warn("WS客户端会话重连线程退出!");
}
}发送心跳包线程
java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.commnetsoft.pub.pro.base.module.ModuleManager;
import cn.edcall.module.device.constant.Constants;
import cn.edcall.module.device.util.ConfigUtil;
import cn.edcall.module.device.util.UUIDUtils;
import cn.edcall.module.device.ws.model.WsReq;
import cn.edcall.module.device.ws.model.WsResp;
/**
* ws心跳定时发送线程
*
* @author Brack.zhu
* @date 2020年11月25日
*/
public class WsHeartbeatThread extends Thread {
private Logger log = LoggerFactory.getLogger(getClass());
/**
* 心跳间隔时间-默认15秒
*/
private long sleepInterval=1500*10;
public WsHeartbeatThread() {
super("Ws Heartbeat Thread");
sleepInterval=ConfigUtil.getLong(Constants.WS_HEARTBEAT_INTERVAL_MS, sleepInterval);
log.info("WS心跳间隔时间(毫秒):"+sleepInterval);
}
@Override
public void run() {
log.warn("ws心跳定时发送线程启动!");
while (true) {
heartbeat();
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException e) {
break;
}
}
log.warn("ws心跳定时发送线程退出!");
}
/**
* 会话心跳
*
* @param username
* @param pwd
* @throws MicroRuntimeException
*/
public void heartbeat() {
try {
WsClientModule wsClientModule=ModuleManager.getInstance().get(WsClientModule.class);
if(wsClientModule.isOk()) {
WsReq<Void> heartbeatReq = new WsReq<>();
heartbeatReq.setId(UUIDUtils.generate());
heartbeatReq.setAction(WsClient.WS_HEARTBEAT_ACTION_NAME);
WsResp<Void> heartbeatResp = wsClientModule.syncSendWsReq(heartbeatReq);
heartbeatResp.tryData();
}else {
log.warn("心跳包发送失败,连接不可用,{}",wsClientModule.clinetInfo());
}
} catch (Exception e) {
log.error("WS会话心跳异常:", e);
}
}
}